tencent cloud

Elastic MapReduce

  • Release Notes and Announcements
  • Product Introduction
  • Purchase Guide
    • EMR on CVM Billing Instructions
    • EMR on TKE Billing Instructions
    • EMR Serverless HBase Billing Instructions
    • EMR Serverless TCBase Billing Overview
  • Getting Started
  • EMR on CVM Operation Guide
    • Planning Cluster
    • Administrative rights
    • Configuring Cluster
    • Managing Cluster
    • Managing Service
    • Monitoring and Alarms
    • TCInsight
  • EMR on TKE Operation Guide
  • EMR Serverless HBase Operation Guide
  • EMR Serverless TCBase Operation Guide
  • EMR Development Guide
    • Hadoop Development Guide
    • Spark Development Guide
    • Hbase Development Guide
    • Phoenix on Hbase Development Guide
    • Hive Development Guide
    • Presto Development Guide
    • Sqoop Development Guide
    • Hue Development Guide
    • Oozie Development Guide
    • Flume Development Guide
    • Kerberos Development Guide
    • Knox Development Guide
    • Alluxio Development Guide
    • Kylin Development Guide
    • Livy Development Guide
    • Kyuubi Development Guide
    • Zeppelin Development Guide
    • Hudi Development Guide
    • Superset Development Guide
    • Impala Development Guide
    • Druid Development Guide
    • TensorFlow Development Guide
    • Kudu Development Guide
    • Ranger Development Guide
    • Kafka Development Guide
    • StarRocks Development Guide
    • Flink Development Guide
    • JupyterLab Development Guide
    • MLflow Development Guide
  • Practical Tutorial
    • Practice of EMR on CVM Ops
    • Data Migration
    • Practical Tutorial on Custom Scaling
  • API Documentation
    • History
    • Introduction
    • API Category
    • Making API Requests
    • Cluster Resource Management APIs
    • Cluster Services APIs
    • User Management APIs
    • Information Query APIs
    • Scaling APIs
    • Configuration APIs
    • Other APIs
    • Cluster Lifecycle APIs
    • Serverless HBase APIs
    • YARN Resource Scheduling APIs
    • Data Types
    • Error Codes
  • FAQs
    • EMR on CVM
  • Service Level Agreement
  • Contact Us
문서Elastic MapReduceEMR Development GuideFlume Development GuideStoring Kafka Data in HDFS or COS Through Flume

Storing Kafka Data in HDFS or COS Through Flume

Download
포커스 모드
폰트 크기
마지막 업데이트 시간: 2025-02-12 16:42:49

Scenario Description

Collecting and saving the data in Kafka to HDFS or COS via Flume

Development Preparations

This task requires access to CKafka, so you need to create a CKafka instance first. For more information, see Message Queue CKafka.
Create an EMR cluster. When creating the EMR cluster, you need to select the Flume component on the software configuration page and enable access to COS on the basic configuration page.

Using the Kafka Toolkit in the EMR Cluster

First, you need to check the private IP and port number of CKafka. Log in to the CKafka console, select the CKafka instance you want to use, and find the private IP ($kafkaIP) and port number (generally 9092) in the basic information section. Create a topic named kafka_test on the topic management page.

Configuring Flume

1. Create a Flume configuration file kafka.properties.
vim kafka.properties
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hdfs_sink
# The following is for source configuration.
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
agent.sources.kafka_source.kafka.topics = kafka_test
# The following is for sink configuration.
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = mem_channel
agent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y%m%d (or cosn://bucket/xxx)
agent.sinks.hdfs_sink.hdfs.rollSize = 0
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream
agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp=true
agent.sinks.hdfs_sink.hdfs.writeFormat=Text
# The following is for channel configuration.
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000
2. Run Flume.
./bin/flume-ng agent --conf ./conf/ -f kafka.properties -n agent -Dflume.root.logger=INFO,console
3. Run Kafka producer.
[hadoop@172 kafka]$ ./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test
test
hello

Testing

Enter information on the Kafka producer client and press Enter.
Check whether the corresponding directory and file hadoop fs -ls /data/flume/kafka/ have been generated in HDFS.

References

도움말 및 지원

문제 해결에 도움이 되었나요?

피드백