/usr/local/service/spark:[root@172 ~]# su hadoop[root@172 root]$ cd / usr/local/service/spark
/opt 目录下:[hadoop@172 data]$ tar -xzvf kafka_2.10-0.10.2.0.tgz[hadoop@172 data]$ mv kafka_2.10-0.10.2.0 /opt/
telnet 命令来测试 EMR 集群是否能够连接到 CKafka 实例:[hadoop@172 kafka_2.10-0.10.2.0]$ telnet $kafkaIP 9092Trying $kafkaIP...Connected to $kafkaIP.
[root@172 ~]# su hadoop[hadoop@172 root]$ cd /opt/kafka_2.10-0.10.2.0/
[hadoop@172 kafka_2.10-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list $kafkaIP:9092--topic spark_streaming_testhello worldthis is a message
[hadoop@172 kafka_2.10-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server$kafkaIP:9092 --from-beginning --new-consumer --topic spark_streaming_testhello worldthis is a message
D://mavenWorkplace 中,输入如下命令新建一个 Maven 工程:mvn archetype:generate -DgroupId=$yourgroupID -DartifactId=$yourartifactID-DarchetypeArtifactId=maven-archetype-quickstart
D://mavenWorkplace 目录下就会生成一个名为 $yourartifactID 的工程文件夹。其中的文件结构如下所示:simple---pom.xml 核心配置,项目根下---src---main---java Java 源码目录---resources Java 配置文件目录---test---java 测试源码目录---resources 测试配置目录
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.0.2</version></dependency></dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka010.ConsumerStrategies;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import scala.Tuple2;import java.util.*;import java.util.concurrent.TimeUnit;/*** Created by tencent on 2018/7/3.*/public class KafkaTest {public static void main(String[] args) throws InterruptedException {String brokers = "$kafkaIP:9092";String topics = "spark_streaming_test1"; // 订阅的话题,多个话题','分隔int durationSeconds = 60; // 间隔时间SparkConf conf = new SparkConf().setAppName("spark streaming word count");JavaSparkContext sc = new JavaSparkContext(conf);JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(durationSeconds));Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));//kafka相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("metadata.broker.list", brokers) ;kafkaParams.put("bootstrap.servers", brokers);kafkaParams.put("group.id", "group1");kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//创建连接JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topicsSet, kafkaParams));//wordcount逻辑JavaPairDStream<String, Integer> counts = lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator()).mapToPair(x -> new Tuple2<String, Integer>(x, 1)).reduceByKey((x, y) -> x + y);// 保存结果counts.dstream().saveAsTextFiles("$hdfsPath","result");//ssc.start();ssc.awaitTermination();ssc.close();}}
mvn package
scp $localfile root@公网IP地址:$remotefolder
D://mavenWorkplace 中,输入如下命令新建一个 Maven 工程:mvn archetype:generate -DgroupId=$yourgroupID -DartifactId=$yourartifactID-DarchetypeArtifactId=maven-archetype-quickstart
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.1.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>0.10.1.0</version></dependency></dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Created by tencent on 2018/7/4.*/public class SendData {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "$kafkaIP:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//生产者发送消息String topic = "spark_streaming_test1";org.apache.kafka.clients.producer.Producer<String, String> procuder = new KafkaProducer<String,String>(props);while(true){int num = (int)((Math.random())*10);for (int i = 0; i <= 10; i++) {int tmp = (num+i)%10;String value = "value_" + tmp;ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);procuder.send(msg);}try {Thread.sleep(1000*10);}catch (InterruptedException e) {}}}}
mvn package
scp $localfile root@公网IP地址:$remotefolder
[hadoop@172 ~]$ bin/spark-submit --class KafkaTest --master yarn-cluster $consumerpackage
[hadoop@172 ~]$ yarn application –list
[hadoop@172 spark]$ bin/spark-submit --class SendData $producerpackage
[hadoop@172 root]$ hdfs dfs -ls /userFound 9 itemsdrwxr-xr-x - hadoop supergroup 0 2018-07-03 16:37 /user/hadoopdrwxr-xr-x - hadoop supergroup 0 2018-06-19 10:10 /user/hive-rw-r--r-- 3 hadoop supergroup 0 2018-06-29 10:19 /user/pythontest.txtdrwxr-xr-x - hadoop supergroup 0 2018-07-05 20:25 /user/sparkstreamingtest-1530793500000.result[hadoop@172 root]$ hdfs dfs -cat /user/sparkstreamingtest-1530793500000.result/*(value_6,16)(value_7,22)(value_8,18)(value_0,18)(value_9,17)(value_1,18)(value_2,17)(value_3,17)(value_4,16)(value_5,17)
[hadoop@172 ~]$ yarn application –kill $Application-Id
文档反馈