Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Apache Flink excels at processing unbounded and bounded data sets. Precise control of time and state enable Flink’s runtime to run any kind of application on unbounded streams. Bounded streams are internally processed by algorithms and data structures that are specifically designed for fixed sized data sets, yielding excellent performance.
Apache Flink requires real-time data from various sources (such as Apache Kafka or Kinesis) in order to execute applications. Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics, which offers exactly-once processing semantics.
bootstrap-server
required by production and consumption.test
. This topic is used as an example below to describe how to consume messages.Configure pom.xml
as follows:
<!--?xml version="1.0" encoding="UTF-8"?-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion>
<groupid>org.example</groupid>
<artifactid>Test-CKafka</artifactid>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka-clients</artifactid>
<version>0.10.2.2</version>
</dependency>
<dependency>
<groupid>org.slf4j</groupid>
<artifactid>slf4j-simple</artifactid>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-java</artifactid>
<version>1.6.1</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-streaming-java_2.11</artifactid>
<version>1.6.1</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-kafka_2.11</artifactid>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-compiler-plugin</artifactid>
<version>3.3</version>
<configuration>
<source>1.8
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
You can click the tabs below to view the two methods of message consumption and view consumption results in the console or through printed logs.
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class CKafkaConsumerDemo {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
//Domain name address for public network access, i.e., public routing address, which can be obtained in the access mode module of the instance details page.
properties.setProperty("bootstrap.servers", "IP:PORT");
//Consumer group ID.
properties.setProperty("group.id", "testConsumerGroup");
DataStream<string> stream = env
.addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));
stream.print();
env.execute();
}
}
Was this page helpful?