tencent cloud

Feedback

Connecting Flume to CKafka

Last updated: 2024-01-09 14:56:36
    Apache Flume is a distributed, reliable, and highly available log collection system that supports a wide variety of data sources such as HTTP, log files, JMS, and listening ports. It can efficiently collect, aggregate, move, and store massive amounts of log data to a specified storage system like Kafka, HDFS, and Solr search server.
    Flume is structured as follows:
    
    Agents are the smallest unit that runs independently in Flume. A Flume agent is a JVM composed of three main components: source, sink, and channel.
    
    
    
    Flume and Kafka
    When you store data in a downstream storage module or compute module such as HDFS or HBase, you need to consider a lot of complex factors such as the number of concurrent writes, system load, and network delay. As a flexible distributed system, Flume provides various APIs and customizable pipelines.
    In the production process, Kafka can act as a cache when the production and consumption are at different paces. It has a high throughput thanks to the partition structure and data appending feature. It is also very fault-tolerant because of the replication structure.
    Therefore, Flume and Kafka can work together to meet most requirements in production environments.

    Connecting Flume to Open-Source Kafka

    Preparations

    Download Apache Flume (v1.6.0 or later is compatible with Kafka).
    Download Kafka (v0.9.x or later is required as v0.8 is no longer supported).
    Confirm that Kafka's source and sink components are already in Flume.

    Connection method

    Kafka can be used as a source or sink to import or export messages.
    Using Kafka as a Source
    Using Kafka as a Sink
    Configure Kafka as the message source, that is, pull data as a consumer from Kafka into a specified sink. The main configuration items are as follows:
    Configuration Item
    Description
    channels
    The configured channel
    type
    It must be org.apache.flume.source.kafka.KafkaSource
    kafka.bootstrap.servers
    Kafka broker server address
    kafka.consumer.group.id
    ID of Kafka's consumer group
    kafka.topics
    Data source topics in Kafka
    batchSize
    Size of each write into the channel
    batchDurationMillis
    The maximum write interval
    Sample:
    tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
    tier1.sources.source1.channels = channel1
    tier1.sources.source1.batchSize = 5000
    tier1.sources.source1.batchDurationMillis = 2000
    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
    tier1.sources.source1.kafka.topics = test1, test2
    tier1.sources.source1.kafka.consumer.group.id = custom.g.id
    For more information, visit Apache Flume's official website.
    Configure Kafka as the message receiver, that is, push data to the Kafka server as a producer for subsequent operations. The main configuration items are as follows:
    Configuration Item
    Description
    channel
    The configured channel
    type
    It must be org.apache.flume.sink.kafka.KafkaSink
    kafka.bootstrap.servers
    Kafka broker server
    kafka.topics
    Data target topics in Kafka
    kafka.flumeBatchSize
    Size of each written batch
    kafka.producer.acks
    Production policy of Kafka producer
    Sample:
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = mytopic
    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    For more information, visit Apache Flume's official website.

    Connecting Flume to CKafka

    Using CKafka as a Sink
    Using CKafka as a Source

    Step 1. Obtain the CKafka instance access address

    1. Log in to the CKafka console.
    2. Select Instance List on the left sidebar and click the ID of the target instance to enter the instance details page.
    3. You can obtain the instance access address in the Access Mode module on the Basic Info tab page.
    
    
    

    Step 2. Create a topic

    1. On the instance details page, select the Topic Management tab at the top.
    2. On the topic management page, click Create to create a topic named flume_test.
    
    
    

    Step 3. Configure Flume

    1. Download the Apache Flume toolkit and decompress it.
    2. Write the configuration file flume-kafka-sink.properties. Below is a simple demo (configured in the conf folder in the extracted directory) for Java. If there is no special requirement, simply replace your own instance IP address and topic in the configuration file. In this demo, the source is tail -F flume-test, which is the newly added information in the file.
    
    
    
    The sample code is as shown below:
    # Demo for using Kafka as the sink
    agentckafka.source = exectail
    agentckafka.channels = memoryChannel
    agentckafka.sinks = kafkaSink
    
    # Set the source type based on different requirements. If you have a special source, you can configure it by yourself. The simplest example is used here.
    agentckafka.sources.exectail.type = exec
    agentckafka.sources.exetail.command = tail -F ./flume.test
    agentckafka.sources.exectail.batchSize = 20
    # Set the source channel
    agentckafka.sources.exectail.channels = memoryChannel
    
    # Set the sink type. It is set to Kafka here
    agentckafka.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
    # Set the ip:port provided by CKafka
    agentckafka.sinks.kafkaSink.brokerList = 172.16.16.12:9092 # Configure the instance IP address
    # Set the topic to which data is to be imported. Create the topic in the CKafka console in advance
    agentckafka.sinks.kafkaSink.topic = flume test #Configure the topic
    # Set the sink channel
    agentckafka.sinks.kafkaSink.channel = memoryChannel
    
    # Use the default configuration for the channel
    # Each channel's type is defined
    agentckafka.channels.memoryChannel.type = memory
    agentckafka.channels.memoryChannel.keep-alive = 10
    
    # Other config values specific to each type of channel (sink or source) can be defined as well
    # In this case, it specifies the capacity of the memory channel
    agentckafka.channels.memoryChannel.capacity = 1000
    agentckafka.channels.memoryChannel.transactionCapacity = 1000
    3. Run the following command to start Flume:
    ./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
    4. Write messages to the flume-test file. At this time, the messages will be written by Flume to CKafka.
    
    
    
    5. Start the CKafka client for consumption.
    ./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:xxxx --topic flume_test --from-beginning --new-consumer
    Note:
    Enter the access address of the CKafka instance just created for the bootstrap-server field and the name of the topic just created for topic.
    You can see that the messages have been consumed.
    
    
    

    Step 1. Obtain the CKafka instance access address

    1. Log in to the CKafka console.
    2. Select Instance List on the left sidebar and click the ID of the target instance to enter the instance details page.
    3. You can obtain the instance access address in the Access Mode module on the Basic Info tab page.
    
    
    

    Step 2. Create a topic

    1. On the instance details page, select the Topic Management tab at the top.
    2. On the topic management page, click Create to create a topic named flume_test.
    
    

    Step 3. Configure Flume

    1. Download the Apache Flume toolkit‌ and decompress it.
    2. Write the configuration file flume-kafka-source.properties. Below is a simple demo (configured in the conf folder in the extracted directory). If there is no special requirement, simply replace your own instance IP address and topic in the configuration file. The sink is logger in this example.
    
    
    
    3. Run the following command to start Flume:
    ./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-source.properties
    4. View the logger output information. The default path is logs/flume.log.
    
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support