tencent cloud

Consumption Demo - Third-party Software
Last updated: 2025-12-03 18:30:48
Consumption Demo - Third-party Software
Last updated: 2025-12-03 18:30:48
This article introduces how to use Filebeat, Logstash, and Flume to consume logs, as well as sending the consumed logs to SIEM (Splunk, Devo) via HTTP.

Filebeat Consumption CLS Logs

Note:
Recommended for use: version 7.5.0 or higher.

filebeat.inputs:
- type: kafka
hosts:
- kafkaconsumer-${region}.cls.tencentyun.com:9095
topics: "your consumption topics"
group_id: "your consumer group name"
username: "${logsetID}"
password: "${SecretId}#${SecretKey}"
sasl.mechanism: "PLAIN"
processors:
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
output.file:
path: /tmp
filename: filebeat_data.log
rotate_every_kb: 102400
number_of_files: 7


Logstash Consumption CLS Logs

Note:
Recommended for use: Logstash 8.0 or higher.
input {
kafka {
# The topic name provided by the cls kafka protocol consumption console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console
topics => "Your consumption topics"
# Service address + port, public network port 9096, private network port 9095, example is for intranet consumption, fill in based on your actual situation
bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"
group_id => "your consumer group name"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
# The username is the log collection ID, such as ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. Use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. Configure the action and resource in the sub-account access policy to the minimum range to fulfill the operations.
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecretId}#${SecretKey}';"
}
}
output {
stdout { codec => json }
}


Consumption of CLS Log by Flume

If you need to consume log data to your self-built HDFS and Kafka clusters, you can use the Flume component for transfer. See the following example for specific operations.
Note:
Recommended for use: Flume 1.9.0 or higher.

Enable Kafka Consumption Protocol for Logs

See Operation Steps to enable Kafka consumption protocol for logs and obtain the service domain and Topic for consumption.

Flume Configuration

a1.sources = source_kafka
a1.sinks = sink_local
a1.channels = channel1

# Configure Source
a1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source_kafka.batchSize = 10
a1.sources.source_kafka.batchDurationMillis = 200000
# Service address + port, public network port 9096, private network port 9095, example is for intranet consumption, fill in based on your actual situation
a1.sources.source_kafka.kafka.bootstrap.servers = kafkaconsumer-${region}.cls.tencentyun.com:9095
# The topic name provided by the cls kafka protocol consumption console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console
a1.sources.source_kafka.kafka.topics = your consumption topics
# Replace with your consumer group name
a1.sources.source_kafka.kafka.consumer.group.id = your consumer group name
a1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliest
a1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN
# The username is the log collection ID, such as ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. It is recommended to use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. Configure the action and resource in the sub-account access policy to the minimum range to fulfill the operations. Note that jaas.config ends with a semicolon; an error will be reported if not filled in.
a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";

# Configure sink
a1.sinks.sink_local.type = logger

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# Bind source and sink to channel
a1.sources.source_kafka.channels = channel1
a1.sinks.sink_local.channel = channel1










Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback