As an extension of Spark Core, Spark Streaming is used for high-throughput and fault-tolerant processing of continuous data. Currently supported external input sources include Kafka, Flume, HDFS/S3, Kinesis, Twitter, and TCP socket.
Spark Streaming abstracts continuous data into a Discretized Stream (DStream), which consists of a series of continuous resilient distributed datasets (RDDs). Each RDD contains data generated at a certain time interval. Processing DStream with functions is actually processing these RDDs.
When Spark Streaming is used as data input for Kafka, the following stable and experimental Kafka versions are supported:
Kafka Version | spark-streaming-kafka-0.8 | spark-streaming-kafka-0.10 |
---|---|---|
Broker Version | 0.8.2.1 or later | 0.10.0 or later |
API Maturity | Deprecated | Stable |
Language Support | Scala, Java, and Python | Scala and Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit API | No | Yes |
Dynamic Topic Subscription | No | Yes |
Currently, CKafka is compatible with version above 0.9. The Kafka dependency of v0.10.2.1 is used in this practice scenario.
In addition, Spark Streaming in EMR also supports direct connection to CKafka. For more information, see Connecting Spark Streaming to CKafka.
bootstrap-server
required by production and consumption.test
. This topic is used as an example below to describe how to produce and consume messages.CentOS 6.8
Package | Version |
---|---|
sbt | 0.13.16 |
Hadoop | 2.7.3 |
Spark | 2.1.0 |
Protobuf | 2.5.0 |
SSH | Installed on CentOS by default |
Java | 1.8 |
For specific installation steps, see [Configuring environment](#Configuring environment).
The Kafka dependency of v0.10.2.1 is used here.
build.sbt
:name := "Producer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1"
producer_example.scala
:import java.util.Properties
import org.apache.kafka.clients.producer._
object ProducerExample extends App {
val props = new Properties()
props.put("bootstrap.servers", "172.16.16.12:9092") // Private IP and port in the instance information
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducerString, String
val TOPIC="test" // Specify the topic to produce to
for(i<- 1 to 50){
val record = new ProducerRecord(TOPIC, "key", s"hello $i") // Produce a message whose key
is "key" and value
is "hello i"
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
producer.send(record)
producer.close() // Disconnect at the end
}
For more information on how to use ProducerRecord
, see ProducerRecord.
sbt_run.sh
script with the following content in the sbt directory and add executable permissions:#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/bin/sbt-launch.jar "$@"
chmod u+x ./sbt_run.sh
./sbt-run.sh sbt-version
./configure
make && make install
protoc --version
useradd -m hadoop -s /bin/bash
visudo
root ALL=(ALL) ALL
:hadoop ALL=(ALL) ALL
su hadoop
cd ~/.ssh/ # If there is no such directory, run `ssh localhost` first
ssh-keygen -t rsa # There will be prompts. Simply press Enter
cat id_rsa.pub >> authorized_keys # Add authorization
chmod 600 ./authorized_keys # Modify file permission
sudo yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel
${JAVA_HOME}
.vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64/jre
export PATH=$PATH:$JAVA_HOME
./bin/hadoop version
vim /etc/profile
export HADOOP_HOME=/usr/local/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
/etc/hadoop/core-site.xml
.<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/usr/local/hadoop/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
/etc/hadoop/hdfs-site.xml
.<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/data</value>
</property>
</configuration>
JAVA_HOME
in /etc/hadoop/hadoop-env.sh
to the Java path.export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64/jre
./bin/hdfs namenode -format
./sbin/start-dfs.sh
Download the required version at Spark's official website.
As Hadoop has already been installed, select Pre-build with user-provided Apache Hadoop
here.
Note:This example also uses the
hadoop
user for operations.
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
vim ./conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
bin/run-example SparkPi
Was this page helpful?