tencent cloud

Connecting Filebeat to CKafka
最終更新日:2024-01-09 14:56:36
Connecting Filebeat to CKafka
最終更新日: 2024-01-09 14:56:36
The Beats platform offers various single-purpose data shippers. Once installed, these shippers can be used as lightweight agents to send the collected data from hundreds or thousands of machines to the target systems.


Beats offers a wide variety of shippers. You can download the most appropriate one based on your needs. This document uses Filebeat, a lightweight log shipper, as an example to describe how to connect Filebeat to CKafka and handle common problems that may occur after the connection.

Prerequisites

You have downloaded and installed Filebeat. For more information, see Filebeat quick start: installation and configuration.
You have downloaded and installed JDK 8. For more information, see Java Downloads.
You have created a CKafka instance. For more information, see Creating Instance.

Directions

Step 1. Obtain the CKafka instance access address

1. Log in to the CKafka console.
2. Select Instance List on the left sidebar and click the ID of the target instance to enter the instance details page.
3. You can obtain the instance access address in the Access Mode module on the Basic Info tab page.




Step 2. Create a topic

1. On the instance details page, select the Topic Management tab at the top.
2. On the topic management page, click Create to create a topic named test.




Step 3. Prepare the configuration file

Enter the installation directory of Filebeat and create the configuration monitoring file filebeat.yml.
#======= For Filebeat 7.x or later versions, change `filebeat.prospectors` to `filebeat.inputs` =======
filebeat.prospectors:

- input_type: log

# This is the path to the monitoring file.
paths:
- /var/log/messages

#======= Outputs =========

#------------------ kafka -------------------------------------
output.kafka:
version:0.10.2 // Set the value to the open-source version of the CKafka instance
# Set to the access address of the CKafka instance
hosts: ["xx.xx.xx.xx:xxxx"]
# Set the name of the target topic
topic: 'test'
partition.round_robin:
reachable_only: false

required_acks: 1
compression: none
max_message_bytes: 1000000

# The following parameters need to be configured for SASL. If SASL is not required, skip them.
username: "yourinstance#yourusername" // You need to concatenate the instance ID and username
password: "yourpassword"


Step 4. Use Filebeat to send a message

1. Run the following command to start the client:
sudo ./filebeat -e -c filebeat.yml
2. Add data to the monitoring file (for example: testlog).
echo ckafka1 >> testlog
echo ckafka2 >> testlog
echo ckafka3 >> testlog
3. Start the consumer to consume the corresponding topic and obtain the following data.
{"@timestamp":"2017-09-29T10:01:27.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka1","offset":500,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
{"@timestamp":"2017-09-29T10:01:30.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka2","offset":508,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
{"@timestamp":"2017-09-29T10:01:33.937Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka3","offset":516,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}

SASL/PLAINTEXT mode

If you want to configure SASL/PLAINTEXT, you need to set the username and password under the Kafka configuration.
# The following parameters need to be configured for SASL. If SASL is not required, skip them.
username: "yourinstance#yourusername" // You need to concatenate the instance ID and username
password: "yourpassword"

FAQs

The Filebeat log file (default path: /var/log/filebeat/filebeat) contains a large number of INFO logs as follows:
2019-03-20T08:55:02.198+0800 INFO kafka/log.go:53 producer/broker/544 starting up
2019-03-20T08:55:02.198+0800 INFO kafka/log.go:53 producer/broker/544 state change to [open] on wp-news-filebeat/4
2019-03-20T08:55:02.198+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/4 selected broker 544
2019-03-20T08:55:02.198+0800 INFO kafka/log.go:53 producer/broker/478 state change to [closing] because EOF
2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 Closed connection to broker bitar1d12:9092
2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/5 state change to [retrying-3]
2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/4 state change to [flushing-3]
2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/5 abandoning broker 478
2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/2 state change to [retrying-2]
2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/2 abandoning broker 541
2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/3 state change to [retrying-2]
2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/broker/478 shut down
This problem may be related to the Filebeat version. Products in the Elastic family are updated frequently, and major version incompatibility problems often occur.
For example, v6.5.x supports Kafka v0.9, v0.10, v1.1.0, and v2.0.0 by default, while v5.6.x supports Kafka v0.8.2.0 by default.
Check the version configuration in the configuration file:
output.kafka:
version:0.10.2 // Set the value to the open-source version of the CKafka instance

Note

When data is sent to CKafka, compression.codec cannot be set.
Gzip compression is not supported by default. To use it, submit a ticket. As Gzip compression causes high CPU consumption, if it is used, all messages will become InValid.
The program cannot run properly when the LZ4 compression is used. Possible causes include: The message format is incorrect. The default message version of CKafka is v0.10.2. You need to use the message format v1.
The setting method for SDK varies by Kafka client. You can query the setting method in the open-source community (such as the description for C/C++ Client) to set the version of the message format.
この記事はお役に立ちましたか?
営業担当者に お問い合わせ いただくか チケットを提出 してサポートを求めることができます。
はい
いいえ

フィードバック