tencent cloud

弹性 MapReduce

动态与公告
产品动态
产品公告
安全公告
产品简介
产品概述
产品优势
产品架构
产品功能
应用场景
约束与限制
技术支持范围
产品发行版
购买指南
EMR on CVM 计费说明
EMR on TKE 计费说明
EMR Serverless HBase 计费说明
快速入门
EMR on CVM 快速入门
EMR on TKE 快速入门
EMR on CVM 操作指南
规划集群
管理权限
配置集群
管理集群
管理服务
监控告警
智能管家
EMR on TKE 操作指南
EMR on TKE 简介
配置集群
管理集群
管理服务
监控运维
应用分析
EMR Serverless HBase 操作指南
EMR Serverless HBase 产品简介
配额与限制
规划实例
管理实例
监控告警
开发指南
EMR 开发指南
Hadoop开发指南
Spark 开发指南
HBASE开发指南
Phoenix on Hbase 开发指南
Hive 开发指南
Presto开发指南
Sqoop 开发指南
Hue 开发指南
Oozie 开发指南
Flume 开发指南
Kerberos 开发指南
Knox 开发指南
Alluxio 开发指南
Kylin 开发指南
Livy 开发指南
Kyuubi 开发指南
Zeppelin 开发指南
Hudi 开发指南
Superset 开发指南
Impala 开发指南
Druid 开发指南
Tensorflow 开发指南
Kudu 开发指南
Ranger 开发指南
Kafka 开发指南
Iceberg 开发指南
StarRocks 开发指南
Flink 开发指南
JupyterLab 开发指南
MLflow 开发指南
实践教程
EMR on CVM 运维实践
数据迁移实践
自定义伸缩实践教程
API 文档
History
Introduction
API Category
Cluster Resource Management APIs
Cluster Services APIs
User Management APIs
Data Inquiry APIs
Scaling APIs
Configuration APIs
Other APIs
Serverless HBase APIs
YARN Resource Scheduling APIs
Making API Requests
Data Types
Error Codes
常见问题
EMR on CVM常见问题
服务等级协议
联系我们
文档弹性 MapReduceEMR 开发指南Flume 开发指南Kafka 数据通过 Flume 存储到 Hbase

Kafka 数据通过 Flume 存储到 Hbase

PDF
聚焦模式
字号
最后更新时间: 2025-02-12 16:42:49

场景说明

将 Kafka 中的数据通过 Flume 收集并存储到 Hbase。

开发准备

因为任务中需要访问腾讯云消息队列 CKafka,所以需要先创建一个 CKafka 实例,具体见 消息队列 CKafka
确认您已开通腾讯云,且已创建一个 EMR 集群。创建 EMR 集群时,需要在软件配置界面选择 Flume 组件。

在 EMR 集群使用 Kafka 工具包

首先需要查看 CKafka 的内网 IP 和端口号。登录消息队列 CKafka 的控制台,选择您要使用的 CKafka 实例,在基本消息中查看其内网 IP 为 $kafkaIP,而端口号一般默认为9092。在 topic 管理界面新建一个 topic 为 kafka_test。

配置 flume

1. 创建 flume 的配置文件 hbase_kafka.properties
vim hbase_kafka.properties
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hbase_sink
# 以下配置 source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
agent.sources.kafka_source.kafka.topics = kafka_test
# 以下配置 sink
agent.sinks.hbase_sink.channel = mem_channel
agent.sinks.hbase_sink.table = foo_table
agent.sinks.hbase_sink.columnFamily = cf
agent.sinks.hbase_sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 以下配置 channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000
2. 创建 hbase 表
hbase shell
create 'foo_table','cf'
3. 运行 flume
./bin/flume-ng agent --conf ./conf/ -f hbase_kafka.properties -n agent -Dflume.root.logger=INFO,console
4. 运行 kafka producer
[hadoop@172 kafka]$ ./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test
hello
hbase_test

测试

在 kafka 生产者客户端输入信息并回车。
观察 hbase 表中是否有相应数据。

参考文档

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈