tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

Flink 接入 CKafka

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:59:41
Apache Flink 是一个可以处理流数据的实时处理框架,用于在无界和有界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。



Apache Flink 擅长处理无界和有界数据集。Flink runtime 能够通过对时间和状态的精确控制处理无界数据流,也能够使用为固定大小数据集设计的算法和数据结构对有界数据集进行处理,并达到出色的性能。
应用程序可能会使用来自各种数据源(如消息队列或分布式日志,如 Apache Kafka 或 Kinesis)的实时数据。Flink 提供了 Apache Kafka 连接器,用于从 Kafka topic 中读取或者向其中写入数据,可提供一次精确的处理语义。

操作步骤

步骤1:获取 CKafka 实例接入地址

1. 登录 CKafka 控制台
2. 在左侧导航栏选择实例列表,单击实例的“ID”,进入实例基本信息页面。
3. 在实例的基本信息页面的接入方式模块,可获取实例的接入地址,接入地址是生产消费需要用到的 bootstrap-server。



步骤2:创建 Topic

1. 在实例基本信息页面,选择顶部Topic管理页签。
2. 在 Topic 管理页面,单击新建,创建一个名为 test 的 Topic,接下来将以该 Topic 为例介绍如何消费。



步骤3:添加 Maven 依赖

pom.xml 配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>Test-CKafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

步骤4:消费 CKafka 中的消息

您可以单击以下页面,查看消费消息的两种方式。通过控制台或打印的日志即可查看消费结果。
通过 VPC 方式消费
通过公网域名方式消费
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class CKafkaConsumerDemo {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
//公网接入域名地址,即公网路由地址,在实例详情页的接入方式模块获取。
properties.setProperty("bootstrap.servers", "IP:PORT");
//消费者组id。
properties.setProperty("group.id", "testConsumerGroup");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));
stream.print();
env.execute();
}
}
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class CKafkaConsumerDemo {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
//公网接入域名地址,即公网路由地址,在实例详情页的接入方式模块获取。
properties.setProperty("bootstrap.servers", "IP:PORT");
//消费者组id。
properties.setProperty("group.id", "testConsumerGroup");
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");
//用户名和密码,注:用户名是需要拼接,并非控制台的用户名:instanceId#username。
properties.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required\\nusername=\\"yourinstanceId#yourusername\\"\\npassword=\\"yourpassword\\";");
properties.setProperty("sasl.kerberos.service.name","kafka");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));
stream.print();
env.execute();
}
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈