tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

Flume 接入 CKafka

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:59:41
Apache Flume 是一个分布式、可靠、高可用的日志收集系统,支持各种各样的数据来源(如 HTTP、Log 文件、JMS、监听端口数据等),能将这些数据源的海量日志数据进行高效收集、聚合、移动,最后存储到指定存储系统中(如 Kafka、分布式文件系统、Solr 搜索服务器等)。
Flume 基本结构如下:



Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM,单个 agent 由 Source、Sink 和 Channel 三大组件构成。


Flume 与 Kafka
把数据存储到 HDFS 或者 HBase 等下游存储模块或者计算模块时需要考虑各种复杂的场景,例如并发写入的量以及系统承载压力、网络延迟等问题。Flume 作为灵活的分布式系统具有多种接口,同时提供可定制化的管道。 在生产处理环节中,当生产与处理速度不一致时,Kafka 可以充当缓存角色。Kafka 拥有 partition 结构以及采用 append 追加数据,使 Kafka 具有优秀的吞吐能力;同时其拥有 replication 结构,使 Kafka 具有很高的容错性。 所以将 Flume 和 Kafka 结合起来,可以满足生产环境中绝大多数要求。

Flume 接入开源 Kafka

准备工作

下载 Apache Flume (1.6.0以上版本兼容 Kafka)
下载 Kafka工具包 (0.9.x以上版本,0.8已经不支持)
确认 Kafka 的 Source、 Sink 组件已经在 Flume 中。

接入方式

Kafka 可作为 Source 或者 Sink 端对消息进行导入或者导出。
Kafka Source
Kafka Sink
配置 kafka 作为消息来源,即将自己作为消费者,从 Kafka 中拉取数据传入到指定 Sink 中。主要配置选项如下:
配置项
说明
channels
自己配置的 Channel
type
必须为:org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers
Kafka Broker 的服务器地址
kafka.consumer.group.id
作为 Kafka 消费端的 Group ID
kafka.topics
Kafka 中数据目标 Topic
batchSize
每次写入 Channel 的大小
batchDurationMillis
每次写入最大间隔时间
示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
更多内容请参见 Apache Flume 官网
配置 Kafka 作为内容接收方,即将自己作为生产者,推到 Kafka Server 中等待后续操作。主要配置选项如下:
配置项
说明
channel
自己配置的 Channel
type
必须为:org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers
Kafka Broker 的服务器
kafka.topics
Kafka 中数据来源 Topic
kafka.flumeBatchSize
每次写入的 Bacth 大小
kafka.producer.acks
Kafka 生产者的生产策略
示例:
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
更多内容请参见 Apache Flume 官网

Flume 接入 CKafka

使用 CKafka 作为 Sink
使用 CKafka 作为 Source

步骤1:获取 CKafka 实例接入地址

1. 登录 CKafka 控制台
2. 在左侧导航栏选择实例列表,单击实例的“ID”,进入实例基本信息页面。
3. 在实例的基本信息页面的接入方式模块,可获取实例的接入地址。



步骤2:创建 Topic

1. 在实例基本信息页面,选择顶部 Topic 管理页签。
2. 在 Topic 管理页面,单击新建,创建一个名为 flume_test 的 Topic。



步骤3:配置 Flume

2. 编写配置文件 flume-kafka-sink.properties,以下是一个简单的 Java 语言 Demo(配置在解压目录的 conf 文件夹下),若无特殊要求则将自己的实例 IP 与 Topic 替换到配置文件当中即可。本例使用的 source 为 tail -F flume-test ,即文件中新增的信息。


代码示例如下:
# 以kafka作为sink的demo
agentckafka.source = exectail
agentckafka.channels = memoryChannel
agentckafka.sinks = kafkaSink

# 设置source类型,根据不同需求而设置。若有特殊source可自行配置,此处使用最简单的例子
agentckafka.sources.exectail.type = exec
agentckafka.sources.exetail.command = tail -F ./flume.test
agentckafka.sources.exectail.batchSize = 20
# 设置source channel
agentckafka.sources.exectail.channels = memoryChannel

# 设置sink类型,此处设置为kafka
agentckafka.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
# 此处设置ckafka提供的ip:port
agentckafka.sinks.kafkaSink.brokerList = 172.16.16.12:9092 # 配置实例IP
# 此处设置需要导入数据的topic,请先在控制台提前创建好topic
agentckafka.sinks.kafkaSink.topic = flume test #配置topic
# 设置sink channel
agentckafka.sinks.kafkaSink.channel = memoryChannel

# Channel使用默认配置
# Each channel's type is defined.
agentckafka.channels.memoryChannel.type = memory
agentckafka.channels.memoryChannel.keep-alive = 10

# Other config values specific to each type of channel(sink or source) can be defined as well
# In this case, it specifies the capacity of the memory channel
agentckafka.channels.memoryChannel.capacity = 1000
agentckafka.channels.memoryChannel.transactionCapacity = 1000
3. 执行如下命令启动 Flume。
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
4. 写入消息到 flume-test 文件中,此时消息将由 Flume 写入到 CKafka。


5. 启动 CKafka 客户端进行消费。
./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:xxxx --topic flume_test --from-beginning --new-consumer
说明
bootstrap-server 填写刚创建的 CKafka 实例的接入地址,topic 填写刚创建的 Topic 名称。
可以看到刚才的消息被消费出来。



步骤1:获取 CKafka 实例接入地址

1. 登录 CKafka 控制台
2. 在左侧导航栏选择实例列表,单击实例的“ID”,进入实例基本信息页面。
3. 在实例的基本信息页面的接入方式模块,可获取实例的接入地址。



步骤2:创建 Topic

1. 在实例基本信息页面,选择顶部 Topic 管理页签。
2. 在 Topic 管理页面,单击新建,创建一个名为 flume_test 的 Topic。





步骤3:配置 Flume

2. 编写配置文件 flume-kafka-source.properties,以下是一个简单的 Demo(配置在解压目录的 conf 文件夹下)。若无特殊要求则将自己的实例 IP 与 Topic 替换到配置文件当中即可。此处使用的 sink 为 logger。

3. 执行如下命令启动 Flume。
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-source.properties
4. 查看 logger 输出信息(默认路径为logs/flume.log)。



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈