Release Notes
Announcements
Security Announcements
root, and the password is the one you set when creating the EMR cluster. Once the correct credentials are entered, you can enter the command line interface./usr/local/service/spark:[root@172 ~]# su hadoop[root@172 root]$ cd / usr/local/service/spark
/opt directory:[hadoop@172 data]$ tar -xzvf kafka_2.10-0.10.2.0.tgz[hadoop@172 data]$ mv kafka_2.10-0.10.2.0 /opt/
telnet command to see whether the EMR cluster is connected to the CKafka instance:[hadoop@172 kafka_2.10-0.10.2.0]$ telnet $kafkaIP 9092Trying $kafkaIP...Connected to $kafkaIP.
[root@172 ~]# su hadoop[hadoop@172 root]$ cd /opt/kafka_2.10-0.10.2.0/
[hadoop@172 kafka_2.10-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list $kafkaIP:9092--topic spark_streaming_testhello worldthis is a message
[hadoop@172 kafka_2.10-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server$kafkaIP:9092 --from-beginning --new-consumer --topic spark_streaming_testhello worldthis is a message
D://mavenWorkplace, by running the following commands:mvn archetype:generate -DgroupId=$yourgroupID -DartifactId=$yourartifactID-DarchetypeArtifactId=maven-archetype-quickstart
D://mavenWorkplace directory. The files included in the folder have the following structure:simple---pom.xml Core configuration, under the project root directory---src---main---java Java source code directory---resources Java configuration file directory---test---java Test source code directory---resources Test configuration directory
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.0.2</version></dependency></dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka010.ConsumerStrategies;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import scala.Tuple2;import java.util.*;import java.util.concurrent.TimeUnit;/*** Created by tencent on 2018/7/3.*/public class KafkaTest {public static void main(String[] args) throws InterruptedException {String brokers = "$kafkaIP:9092";String topics = "spark_streaming_test1"; // Subscribed topics; multiple topics should be separated by ','int durationSeconds = 60; // IntervalSparkConf conf = new SparkConf().setAppName("spark streaming word count");JavaSparkContext sc = new JavaSparkContext(conf);JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(durationSeconds));Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));// Kafka-related parameterMap<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("metadata.broker.list", brokers) ;kafkaParams.put("bootstrap.servers", brokers);kafkaParams.put("group.id", "group1");kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// Create a connectionJavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topicsSet, kafkaParams));// wordcount logicJavaPairDStream<String, Integer> counts = lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator()).mapToPair(x -> new Tuple2<String, Integer>(x, 1)).reduceByKey((x, y) -> x + y);// Save the resultcounts.dstream().saveAsTextFiles("$hdfsPath","result");//ssc.start();ssc.awaitTermination();ssc.close();}}
mvn package
scp $localfile root@public IP address:$remotefolder
D://mavenWorkplace, by running the following commands:mvn archetype:generate -DgroupId=$yourgroupID -DartifactId=$yourartifactID-DarchetypeArtifactId=maven-archetype-quickstart
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.1.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>0.10.1.0</version></dependency></dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Created by tencent on 2018/7/4.*/public class SendData {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "$kafkaIP:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// The producer sends a messageString topic = "spark_streaming_test1";org.apache.kafka.clients.producer.Producer<String, String> procuder = new KafkaProducer<String,String>(props);while(true){int num = (int)((Math.random())*10);for (int i = 0; i <= 10; i++) {int tmp = (num+i)%10;String value = "value_" + tmp;ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);procuder.send(msg);}try {Thread.sleep(1000*10);}catch (InterruptedException e) {}}}}
mvn package
scp $localfile root@public IP address:$remotefolder
[hadoop@172 ~]$ bin/spark-submit --class KafkaTest --master yarn-cluster $consumerpackage
[hadoop@172 ~]$ yarn application –list
[hadoop@172 spark]$ bin/spark-submit --class SendData $producerpackage
[hadoop@172 root]$ hdfs dfs -ls /userFound 9 itemsdrwxr-xr-x - hadoop supergroup 0 2018-07-03 16:37 /user/hadoopdrwxr-xr-x - hadoop supergroup 0 2018-06-19 10:10 /user/hive-rw-r--r-- 3 hadoop supergroup 0 2018-06-29 10:19 /user/pythontest.txtdrwxr-xr-x - hadoop supergroup 0 2018-07-05 20:25 /user/sparkstreamingtest-1530793500000.result[hadoop@172 root]$ hdfs dfs -cat /user/sparkstreamingtest-1530793500000.result/*(value_6,16)(value_7,22)(value_8,18)(value_0,18)(value_9,17)(value_1,18)(value_2,17)(value_3,17)(value_4,16)(value_5,17)
[hadoop@172 ~]$ yarn application –kill $Application-Id
yarn application –list command.
For more information on Kafka, please see the official documentation.Was this page helpful?
You can also Contact sales or Submit a Ticket for help.
Help us improve! Rate your documentation experience in 5 mins.
Feedback