tencent cloud

TDMQ for CKafka

Release Notes and Announcements
Release Notes
Broker Release Notes
Announcement
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for CKafka
Strengths
Scenarios
Technology Architecture
Product Series Introduction
Apache Kafka Version Support Description
Comparison with Apache Kafka
High Availability
Use Limits
Regions and AZs
Related Cloud Services
Billing
Billing Overview
Pricing
Billing Example
Changing from Postpaid by Hour to Monthly Subscription
Renewal
Viewing Consumption Details
Overdue Payments
Refund
Getting Started
Guide for Getting Started
Preparations
VPC Network Access
Public Domain Name Access
User Guide
Usage Process Guide
Configuring Account Permission
Creating Instance
Configuring Topic
Connecting Instance
Managing Messages
Managing Consumer Group
Managing Instance
Changing Instance Specification
Configuring Traffic Throttling
Configuring Elastic Scaling Policy
Configuring Advanced Features
Viewing Monitoring Data and Configuring Alarm Rules
Synchronizing Data Using CKafka Connector
Use Cases
Cluster Resource Assessment
Client Practical Tutorial
Log Integration
Open-Source Ecosystem Integration
Replacing Supporting Route (Old)
Migration Guide
Migration Solution Overview
Migrating Cluster Using Open-Source Tool
Troubleshooting
Topics
Clients
Messages
​​API Reference
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK Reference
SDK Overview
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
SDK for Connector
Security and Compliance
Permission Management
Network Security
Deletion Protection
Event Record
CloudAudit
FAQs
Instances
Topics
Consumer Groups
Client-Related
Network-Related
Monitoring
Messages
Agreements
CKafka Service Level Agreements
Contact Us
Glossary

Storm Accessing CKafka

PDF
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-01-20 17:10:14
Storm is a distributed real-time computing framework that can perform streaming data processing and provide general distributed RPC calls. It can achieve sub-second delay in processing events and is suitable for real-time data processing scenarios with high delay requirements.

How Storm Works

There are two types of nodes in a Storm cluster: control node Master Node and work node Worker Node. The Master Node runs the Nimbus process, which is responsible for resource allocation and status monitoring. The Worker Node runs the Supervisor process, which listens for work tasks and launches executor. The entire Storm cluster relies on zookeeper for storing public data, monitoring cluster status, and distributing tasks.
The data processing program submitted by users to Storm is called topology. The smallest unit of message it processes is tuple, an array of arbitrary objects. topology consists of spout and bolt. spout is the source that produces tuple. bolt can subscribe to tuple emitted by any spout or bolt for processing.



Storm with CKafka

Storm can use CKafka as a spout to consume data for processing. It can also be used as a bolt to store processed data for consumption by other components.

Test Environment

Centos6.8 System
package
version
maven
3.5.0
storm
2.1.0
ssh
5.3
Java
1.8

Prerequisites

Download and install JDK 8. For detailed operations, see Download JDK 8.
Download and install Storm. See Apache Storm downloads.

Operation Steps

Step 1: Obtaining the CKafka Instance Access Address

2. In the left sidebar, select Instance List and click the ID of the target instance to go to the basic instance information page.
3. On the Access Mode section of the instance's basic information page, you can obtain the access address of the instance.



Step 2: Creating a Topic

1. On the basic instance information page, select the Topic List tab at the top.
2. On the Topic Management page, click Create to create a Topic.



Step 3: Adding Maven Dependencies

Configure pom.xml as follows:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>storm</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>ExclamationTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

Step 4: Producing Messages

Using spout/bolt

Topology code:
//TopologyKafkaProducerSpout.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import java.util.Properties;

public class TopologyKafkaProducerSpout {
//ip:port of the CKafka instance applied for.
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//Specify the topic to which to write messages.
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//Set producer attributes.
//See: https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//See properties at: http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//Create a bolt to be written to Kafka. The fields("key" "message")` is used as the key and message for the produced message by default, which can also be specified in FieldNameBasedTupleToKafkaMapper().
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(properties)
.withTopicSelector(new DefaultTopicSelector(TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
TopologyBuilder builder = new TopologyBuilder();
//A spout class that generates messages in sequence with the output field being a sentence.
SerialSentenceSpout spout = new SerialSentenceSpout();
AddMessageKeyBolt bolt = new AddMessageKeyBolt();
builder.setSpout("kafka-spout", spout, 1);
//Add the fields required to produce messages to CKafka for the tuple.
builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
//Write to ckafka.
builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");

Config config = new Config();
if (args != null && args.length > 0) {
//Cluster mode, which is used to package a jar file and run it in Storm.
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
} else {
//Local mode
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}

}
}
Create a spout class that generates messages in sequence:
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.UUID;

public class SerialSentenceSpout extends BaseRichSpout {

private SpoutOutputCollector spoutOutputCollector;

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}

@Override
public void nextTuple() {
Utils.sleep(1000);
//Produce a UUID string and send it to the next component.
spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
Add two fields, key and message, to the tuple. When the key is null, the produced messages are evenly distributed across all partitions. If a key is specified, the message will be hashed to a specific partition based on the key value:
//AddMessageKeyBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class AddMessageKeyBolt extends BaseBasicBolt {

@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//Get the first filed value.
String messae = tuple.getString(0);
// System.out.println(messae);
v//Send to the next component.
basicOutputCollector.emit(new Values(null, messae));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
v//Create a schema to send to the next component.
outputFieldsDeclarer.declare(new Fields("key", "message"));
}
}

Using trident

Use the trident class to generate a topology:
//TopologyKafkaProducerTrident.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Properties;

public class TopologyKafkaProducerTrident {
//ip:port of the CKafka instance applied for.
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//Specify the topic to which to write messages.
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//Set producer attributes.
//See: https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//See properties at: http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//Set the trident.
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(properties)
.withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
// Set to use `fields("key", "value")` as the written message, which doesn't have a default value as `FieldNameBasedTupleToKafkaMapper` does
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
TridentTopology builder = new TridentTopology();
//A spout that generates messages in batches with the output field being a sentence.
builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
.each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
.partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());

Config config = new Config();
if (args != null && args.length > 0) {
//Cluster mode, which is used to package a jar file and run it in Storm.
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
} else {
//Local mode
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.build());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}

}

private static class AddMessageKey extends BaseFunction {

@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
//Get the first filed value.
String messae = tridentTuple.getString(0);
//System.out.println(messae);
v//Send to the next component.
//tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
tridentCollector.emit(new Values(null, messae));
}
}
}
Create a spout class that generates messages in batches:
//TridentSerialSentenceSpout.java
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.UUID;

public class TridentSerialSentenceSpout implements IBatchSpout {

private final int batchCount;

public TridentSerialSentenceSpout(int batchCount) {
this.batchCount = batchCount;
}

@Override
public void open(Map map, TopologyContext topologyContext) {

}

@Override
public void emitBatch(long l, TridentCollector tridentCollector) {
Utils.sleep(1000);
for(int i = 0; i < batchCount; i++){
tridentCollector.emit(new Values(UUID.randomUUID().toString()));
}
}

@Override
public void ack(long l) {

}

@Override
public void close() {

}

@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}

@Override
public Fields getOutputFields() {
return new Fields("sentence");
}
}

Step 5: Consuming Messages

Using spout/bolt

//TopologyKafkaConsumerSpout.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;
import java.util.Map;

import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;

public class TopologyKafkaConsumerSpout {
//ip:port of the CKafka instance applied for.
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//Specify the topic to which to write messages.
private final static String TOPIC = "storm_test";

public static void main(String[] args) throws Exception {
//Set a retry policy.
KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
);
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//Set consumer parameters.
//See the function at http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//See parameters at http://kafka.apache.org/0102/documentation.html
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //Set the group.
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //Set the session timeout.
vput(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //Set the request timeout.
}})
.setOffsetCommitPeriodMs(10_000) //Set the automatic confirmation period.
.setFirstPollOffsetStrategy(LATEST) //Set to pull the latest message.
.setRetry(kafkaSpoutRetryService)
.setRecordTranslator(trans)
.build();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
builder.setBolt("bolt", new BaseRichBolt(){
private OutputCollector outputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}

@Override
public void execute(Tuple tuple) {
System.out.println(tuple.getStringByField("value"));
outputCollector.ack(tuple);
}
}, 1).shuffleGrouping("kafka-spout");

Config config = new Config();
config.setMaxSpoutPending(20);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(20000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}

Using trident

//TopologyKafkaConsumerTrident.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;

import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;


public class TopologyKafkaConsumerTrident {
//ip:port of the CKafka instance applied for.
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//Specify the topic to which to write messages.
private final static String TOPIC = "storm_test";

public static void main(String[] args) throws Exception {
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//Set consumer parameters.
//See the function at http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//See parameters at http://kafka.apache.org/0102/documentation.html
KafkaTridentSpoutConfig spoutConfig = KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //Set the group.
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //Set automatic confirmation.
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //Set the session timeout.
vput(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //Set the request timeout.
}})
.setFirstPollOffsetStrategy(LATEST) //Set to pull the latest message.
.setRecordTranslator(trans)
.build();

TridentTopology builder = new TridentTopology();
// Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); // Transaction type
Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
System.out.println(tridentTuple.getStringByField("value"));
tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
}
}, new Fields("message"));

Config conf = new Config();
conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
}
else {
StormTopology stormTopology = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, stormTopology);
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();stormTopology.clear();
}
}
}

Step 6: Submit to Storm

After being compiled with mvn package, Storm can be submitted to the local cluster for debugging or submitted to the production cluster for running.
storm jar your_jar_name.jar topology_name
storm jar your_jar_name.jar topology_name tast_name


도움말 및 지원

문제 해결에 도움이 되었나요?

피드백