Storm is a distributed real-time computing framework that can perform stream-based data processing and provide universal distributed RPC calling so as to reduce the delay of event processing down to sub-seconds. It is suitable for real-time data processing scenarios where low delay is required.
There are two types of nodes in a Storm cluster: master node
and worker node
. The Nimbus
process runs on the master node
for resource allocation and status monitoring, and the Supervisor
process runs on the worker node
for listening on work tasks and starting the executor
. The entire Storm cluster relies on ZooKeeper
for common data storage, cluster status listening, task assignment, etc.
A data processing program submitted to Storm is called a topology
. The minimum message unit it processes is tuple
(an array of arbitrary objects). A topology
consists of spout
and bolt
, where spout
is the source of tuple
, while bolt
can subscribe to any tuple
issued by spout
or bolt
for processing.
Storm can use CKafka as a spout
to consume data for processing or as a bolt
to store the processed data for consumption by other components.
CentOS 6.8
Package | Version |
---|---|
Maven | 3.5.0 |
Storm | 2.1.0 |
SSH | 5.3 |
Java | 1.8 |
Configure pom.xml
as follows:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion>
<groupid>storm</groupid>
<artifactid>storm</artifactid>
<version>0.0.1-SNAPSHOT</version>
<name>storm</name>
<properties>
<project.build.sourceencoding>UTF-8</project.build.sourceencoding>
</properties>
<dependencies>
<dependency>
<groupid>org.apache.storm</groupid>
<artifactid>storm-core</artifactid>
<version>2.1.0</version>
</dependency>
<dependency>
<groupid>org.apache.storm</groupid>
<artifactid>storm-kafka-client</artifactid>
<version>2.1.0</version>
</dependency>
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka_2.11</artifactid>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupid>org.slf4j</groupid>
<artifactid>slf4j-log4j12</artifactid>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupid>junit</groupid>
<artifactid>junit</artifactid>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactid>maven-assembly-plugin</artifactid>
<configuration>
<descriptorrefs>
<descriptorref>jar-with-dependencies</descriptorref>
</descriptorrefs>
<archive>
<manifest>
<mainclass>ExclamationTopology</mainclass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-compiler-plugin</artifactid>
<configuration>
<source>1.8
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Topology code:
//TopologyKafkaProducerSpout.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import java.util.Properties;
public class TopologyKafkaProducerSpout {
// `ip:port` of the CKafka instance applied for
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
// Specify the topic to which to write messages
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
// Set producer attributes
// For functions, visit https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
// For attributes, visit http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Create a bolt to be written to Kafka. `fields("key" "message")` is used as the key and message for the produced message by default, which can also be specified in `FieldNameBasedTupleToKafkaMapper()`
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(properties)
.withTopicSelector(new DefaultTopicSelector(TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
TopologyBuilder builder = new TopologyBuilder();
// A spout class that generates messages in sequence with the output field being `sentence`
SerialSentenceSpout spout = new SerialSentenceSpout();
AddMessageKeyBolt bolt = new AddMessageKeyBolt();
builder.setSpout("kafka-spout", spout, 1);
// Add the fields required to produce messages to CKafka for the tuple
builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
// Write to CKafka
builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");
Config config = new Config();
if (args != null && args.length > 0) {
// Cluster mode, which is used to package a jar file and run it in Storm
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
} else {
// Local mode
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
Create a spout class that generates messages in sequence:
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.UUID;
public class SerialSentenceSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
// Produce a `UUID` string and send it to the next component
spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
Add key
and message
fields to the tuple
. If key
is null, the produced messages will be evenly allocated to each partition. If a key is specified, the messages will be hashed to specific partitions based on the key value:
//AddMessageKeyBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class AddMessageKeyBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
// Take out the first field value
String messae = tuple.getString(0);
// System.out.println(messae);
// Send to the next component
basicOutputCollector.emit(new Values(null, messae));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// Create a schema to send to the next component
outputFieldsDeclarer.declare(new Fields("key", "message"));
}
}
Use the trident class to generate a topology
//TopologyKafkaProducerTrident.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Properties;
public class TopologyKafkaProducerTrident {
// `ip:port` of the CKafka instance applied for
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
// Specify the topic to which to write messages
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
// Set producer attributes
// For functions, visit https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
// For attributes, visit http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Set the trident
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(properties)
.withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
// Set to use `fields("key", "value")` as the written message, which doesn't have a default value as `FieldNameBasedTupleToKafkaMapper` does
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
TridentTopology builder = new TridentTopology();
// A spout that generates messages in batches with the output field being `sentence`
builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
.each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
.partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());
Config config = new Config();
if (args != null && args.length > 0) {
// Cluster mode, which is used to package a jar file and run it in Storm
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
} else {
// Local mode
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.build());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
private static class AddMessageKey extends BaseFunction {
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
// Take out the first field value
String messae = tridentTuple.getString(0);
//System.out.println(messae);
// Send to the next component
//tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
tridentCollector.emit(new Values(null, messae));
}
}
}
Create a spout class that generates messages in batches:
//TridentSerialSentenceSpout.java
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.UUID;
public class TridentSerialSentenceSpout implements IBatchSpout {
private final int batchCount;
public TridentSerialSentenceSpout(int batchCount) {
this.batchCount = batchCount;
}
@Override
public void open(Map map, TopologyContext topologyContext) {
}
@Override
public void emitBatch(long l, TridentCollector tridentCollector) {
Utils.sleep(1000);
for(int i = 0; i < batchCount; i++){
tridentCollector.emit(new Values(UUID.randomUUID().toString()));
}
}
@Override
public void ack(long l) {
}
@Override
public void close() {
}
@Override
public Map<string, object=""> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
@Override
public Fields getOutputFields() {
return new Fields("sentence");
}
}
//TopologyKafkaConsumerSpout.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
public class TopologyKafkaConsumerSpout {
// `ip:port` of the CKafka instance applied for
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
// Specify the topic to which to write messages
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
// Set a retry policy
KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
);
ByTopicRecordTranslator<string, string=""> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
// Set consumer parameters
// For functions, visit http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
// For parameters, visit http://kafka.apache.org/0102/documentation.html
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<string, object="">(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); // Set the group
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); // Set the session timeout period
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); // Set the request timeout period
}})
.setOffsetCommitPeriodMs(10_000) // Set the automatic confirmation period
.setFirstPollOffsetStrategy(LATEST) // Set to pull the latest message
.setRetry(kafkaSpoutRetryService)
.setRecordTranslator(trans)
.build();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
builder.setBolt("bolt", new BaseRichBolt(){
private OutputCollector outputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
System.out.println(tuple.getStringByField("value"));
outputCollector.ack(tuple);
}
}, 1).shuffleGrouping("kafka-spout");
Config config = new Config();
config.setMaxSpoutPending(20);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(20000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
//TopologyKafkaConsumerTrident.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
public class TopologyKafkaConsumerTrident {
// `ip:port` of the CKafka instance applied for
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
// Specify the topic to which to write messages
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
ByTopicRecordTranslator<string, string=""> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
// Set consumer parameters
// For functions, visit http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
// For parameters, visit http://kafka.apache.org/0102/documentation.html
KafkaTridentSpoutConfig spoutConfig = KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<string, object="">(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); // Set the group
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Set automatic confirmation
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); // Set the session timeout period
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); // Set the request timeout period
}})
.setFirstPollOffsetStrategy(LATEST) // Set to pull the latest message
.setRecordTranslator(trans)
.build();
TridentTopology builder = new TridentTopology();
// Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); // Transaction type
Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
System.out.println(tridentTuple.getStringByField("value"));
tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
}
}, new Fields("message"));
Config conf = new Config();
conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
}
else {
StormTopology stormTopology = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, stormTopology);
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();stormTopology.clear();
}
}
}
After being compiled with mvn package
, Storm can be submitted to the local cluster for debugging or submitted to the production cluster for running.
storm jar your_jar_name.jar topology_name
storm jar your_jar_name.jar topology_name tast_name
Was this page helpful?