filebeat.inputs:- type: kafkahosts:- kafkaconsumer-${region}.cls.tencentyun.com:9095topics: "your consumption topics"group_id: "your consumer group name"username: "${logsetID}"password: "${SecretId}#${SecretKey}"sasl.mechanism: "PLAIN"processors:- decode_json_fields:fields: ["message"]target: ""overwrite_keys: trueoutput.file:path: /tmpfilename: filebeat_data.logrotate_every_kb: 102400number_of_files: 7
input {kafka {# The topic name provided by the cls kafka protocol consumption console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the consoletopics => "Your consumption topics"# Service address + port, public network port 9096, private network port 9095, example is for intranet consumption, fill in based on your actual situationbootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"group_id => "your consumer group name"security_protocol => "SASL_PLAINTEXT"sasl_mechanism => "PLAIN"# The username is the log collection ID, such as ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. Use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. Configure the action and resource in the sub-account access policy to the minimum range to fulfill the operations.sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecretId}#${SecretKey}';"}}output {stdout { codec => json }}
a1.sources = source_kafkaa1.sinks = sink_locala1.channels = channel1# Configure Sourcea1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.source_kafka.batchSize = 10a1.sources.source_kafka.batchDurationMillis = 200000# Service address + port, public network port 9096, private network port 9095, example is for intranet consumption, fill in based on your actual situationa1.sources.source_kafka.kafka.bootstrap.servers = kafkaconsumer-${region}.cls.tencentyun.com:9095# The topic name provided by the cls kafka protocol consumption console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the consolea1.sources.source_kafka.kafka.topics = your consumption topics# Replace with your consumer group namea1.sources.source_kafka.kafka.consumer.group.id = your consumer group namea1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliesta1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXTa1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN# The username is the log collection ID, such as ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. It is recommended to use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. Configure the action and resource in the sub-account access policy to the minimum range to fulfill the operations. Note that jaas.config ends with a semicolon; an error will be reported if not filled in.a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";# Configure sinka1.sinks.sink_local.type = loggera1.channels.channel1.type = memorya1.channels.channel1.capacity = 1000a1.channels.channel1.transactionCapacity = 100# Bind source and sink to channela1.sources.source_kafka.channels = channel1a1.sinks.sink_local.channel = channel1
Feedback