tencent cloud

日志服务

文档日志服务操作指南日志采集使用 Kafka 协议上传日志

使用 Kafka 协议上传日志

Download
聚焦模式
字号
最后更新时间: 2025-11-26 10:31:16
日志服务(Cloud Log Service,CLS)目前已支持使用 Kafka Producer SDK 和其他 Kafka 相关 agent 上传日志到 CLS。

使用场景

日志应用中使用 Kafka 作为消息管道是非常普遍的场景。如通过机器上的开源采集客户端或者使用 producer 直接写入日志,再通过消费管道提供给下游如 spark、flink 等进行消费。CLS 具备完整的 Kafka 数据管道上下行能力,以下主要介绍哪些场景适合您使用 Kafka 协议上传日志,更多 Kafka 协议消费场景请参见 Kafka 协议实时消费
场景1:您已有基于开源采集的自建系统,不希望有复杂的二次改造,您可以通过修改配置文件将日志上传到 CLS。 例如,您之前使用 ELK 搭建日志系统的客户,现在只需要通过修改 Filebeat 或者 Logstash 的配置文件,将 Output 配置(详情请参见 filebeat 配置)到 CLS,即可非常方便简洁地将日志上传。
场景2:您希望通过 Kafka producer SDK 来采集日志并上传,不必再安装采集 Agent。
CLS 支持您使用各类 Kafka producer SDK 采集日志,并通过 Kafka 协议上传到 CLS。详情请参见本文提供的 SDK 调用示例

相关限制

支持 Kafka 协议版本为:0.11.0.X、1.0.X、1.1.X、2.0.X、2.1.X、2.2.X、2.3.X、2.4.X、2.5.X、2.6.X、2.7.X、2.8.X、3.X.X。
支持压缩方式:gzip、snappy、lz4。
当前使用 SASL_PLAINTEXT 认证。
使用 Kafka 协议上传需要配置 RealtimeProducer 权限,详情请参见 CLS 访问策略模板

配置方式

使用 Kafka 协议上传日志至 CLS Kafka 生产者端时,需要配置下 CLS Kafka 访问信息。
参数
说明
鉴权机制
当前支持 SASL_PLAINTEXT。
hosts
CLS Kafka 地址,根据目标写入日志主题所在地域配置。请参见 CLS Kafka 地址
topic
CLS Kafka topic 名称,配置为日志主题 ID。例如:76c63473-c496-466b-XXXX-XXXXXXXXXXXX。
username
CLS Kafka 访问用户名,配置为日志集 ID。例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX。
password
CLS Kafka 访问密码,格式为 ${secret_id}#${secret_key}。例如:XXXXXXXXXXXXXX#YYYYYYYY。密钥信息获取请前往 密钥获取。请确保密钥关联的账号具有相应的 Kafka 协议上传日志权限
若要匿名上传,格式为 topic_id#${日志主题 ID}。例如:topic_id#76c63473-c496-466b-XXXX-XXXX。
注意:
目标日志主题需开启匿名上传,并在匿名操作选择 Kafka 协议上传日志。详情请参见 日志主题

header
定义在通过 Kafka 协议上传时日志的解析行为。
json_remove_escape:对日志进行 JSON 解析时是否进行去转义,取值为 true、false,若不指定默认为 false。
time_key:日志中的指定时间字段,表示选择日志中的指定字段作为日志采集时间。
time_format:当配置了 time_key,需额外配置指定字段的时间解析格式,详情请参见 配置时间格式

CLS Kafka 地址

通过 Kafka 协议上传日志时,基于访问方式(内网/外网)以及目标日志主题所在地域,CLS Kafka 地址配置为如下。
说明:
地域(region)格式请参见 可用域名 - Kafka上传日志
访问方式:
内网访问:数据发起端为腾讯云服务器且服务器所在地域与目标日志主题一致。
外网访问:数据发起端为非腾讯云服务器或者服务器所在地域与目标日志主题不一致。
访问方式
CLS Kafka 地址
内网
${region}-producer.cls.tencentyun.com:9095
外网
${region}-producer.cls.tencentcs.com:9096

示例

Beat 调用示例

filebeat/winlogbeat 配置

注意:
不同的 filebeat 版本,请使用不同的配置内容。
filebeat版本 > 8.18
filebeat版本 ≤ 8.18
output.kafka:
enabled: true
hosts: ["${region}-producer.cls.tencentyun.com:9095"] # TODO 服务地址;外网端口9096,内网端口9095
topic: "${ClsTopicID}" # TODO 日志主题 ID
version: "1.0.0"
compression: "${compress}" # TODO 配置压缩方式,支持 gzip、snappy、lz4,例如"lz4"
username: "${ClslogsetID}" # TODO 日志集 ID
# 若要匿名上传,password: "topic_id#${日志主题 ID}"
password: "${secret_id}#${secret_key}"
output.kafka:
enabled: true
hosts: ["${region}-producer.cls.tencentyun.com:9095"] # TODO 服务地址;外网端口9096,内网端口9095
topic: "${ClsTopicID}" # TODO 日志主题 ID
version: "0.11.0.2"
compression: "${compress}" # TODO 配置压缩方式,支持gzip、snappy、lz4,例如"lz4"
username: "${ClslogsetID}" # TODO 日志集 ID
# 若要匿名上传,password: "topic_id#${日志主题 ID}"
password: "${secret_id}#${secret_key}"
若出现报错 kafka: client has run out of available brokers to talk to,建议将 version 的版本提升至1.0.0。

Logstash 调用示例

logstash 配置

output {
kafka {
topic_id => "${ClstopicID}"
bootstrap_servers => "${region}-producer.cls.tencentyun.com:${port}"
sasl_mechanism => "PLAIN"
security_protocol => "SASL_PLAINTEXT"
compression_type => "${compress}"
# 若要匿名上传,password='topic_id#${日志主题 ID}'
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${ClslogsetID}' password='${secret_id}#${secret_key}';"
codec => json
}
}

FluentTD 调用示例

注意:
该 demo 基于 fluentd-1.15.3版本验证;依赖的 ruby 版本 ruby=2.7.6。

FluentTD 配置

<match *>
@type rdkafka2

# brokers setting
# TODO 域名参考 https://cloud.tencent.com/document/product/614/18940,注意内网端口9095,公网端口9096
brokers "${domain}:${port}" # 例如 gz-producer.cls.tencentyun.com:9095

# topic settings
# TODO 替换日志主题 ID
topic "${topic_id}"

# sasl
rdkafka_options {
"sasl.mechanism": "PLAIN",
"security.protocol": "sasl_plaintext",
# TODO topic 所属日志集 ID
"sasl.username": "${logset_id}",
# TODO topic 所属 uin 的密钥,格式为 ${secret_id}#${secret_key};若要匿名上传,格式为 topic_id#${日志主题 ID}
"sasl.password": "${secret_id}#${secret_key}"
}

required_acks 1
compression_codec gzip

<format>
@type json
</format>

<buffer tag>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
chunk_limit_size 3MB
chunk_full_threshold 1
total_limit_size 1024MB
overflow_action block
</buffer>
</match>

FluentBit 调用示例

FluentBit 配置

[OUTPUT]
Name kafka
Match *
# TODO 域名参考 https://cloud.tencent.com/document/product/614/18940,注意内网端口9095,公网端口9096
Brokers ${domain}:${port} # 例如 gz-producer.cls.tencentyun.com:9095
# TODO 替换日志主题 ID
Topics ${topic_id}
# TODO 请求消息的最大大小,最大不能超过5M
rdkafka.message.max.bytes 5242880
rdkafka.sasl.mechanisms PLAIN
rdkafka.security.protocol sasl_plaintext
# TODO 根据使用场景选择 acks 的值
rdkafka.acks 1
# TODO 配置压缩方式
rdkafka.compression.codec lz4
# TODO topic 所属日志集 ID
rdkafka.sasl.username ${logset_id}
# TODO topic 所属 uin 的密钥,格式为 ${secret_id}#${secret_key};若要匿名上传,格式为 topic_id#${日志主题 ID}
rdkafka.sasl.password ${secret_id}#${secret_key}

SDK 调用示例

Golang SDK 调用示例

以下以 sarama.V1_1_0_0为例,其他版本按照类似规则配置:
import (
"fmt"
"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()

config.Net.SASL.Mechanism = "PLAIN"
config.Net.SASL.Version = int16(1)
config.Net.SASL.Enable = true
// TODO 日志集 ID
config.Net.SASL.User = "${logsetID}"
// TODO 格式为 ${secret_id}#${secret_key},若要匿名上传,格式为 topic_id#${日志主题 ID}
config.Net.SASL.Password = "${secret_id}#${secret_key}"
config.Producer.Return.Successes = true
// TODO 根据使用场景选择 acks 的值
config.Producer.RequiredAcks = ${acks}
config.Version = sarama.V1_1_0_0
// TODO 配置压缩方式
config.Producer.Compression = ${compress}

// TODO 服务地址;外网端口9096,内网端口9095
producer, err := sarama.NewSyncProducer([]string{"${region}-producer.cls.tencentyun.com:9095"}, config)
if err != nil {
panic(err)
}

msg := &sarama.ProducerMessage{
Topic: "${topicID}", // TODO 日志主题 ID
Value: sarama.StringEncoder("goland sdk sender demo"),
}
// 发送消息
for i := 0; i <= 5; i++ {
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("send response; partition:%d, offset:%d\\n", partition, offset)
}

_ = producer.Close()

}

Python SDK 调用示例

from kafka import KafkaProducer

if __name__ == '__main__':
produce = KafkaProducer(
# TODO 服务地址;外网端口9096,内网端口9095
bootstrap_servers=["${region}-producer.cls.tencentyun.com:9095"],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
# TODO 日志集 ID
sasl_plain_username='${logsetID}',
# TODO 格式为 ${secret_id}#${secret_key}, 若要匿名上传,格式为 topic_id#${日志主题 ID}
sasl_plain_password='${secret_id}#${secret_key}',
api_version=(0, 11, 0),
# TODO 配置压缩方式
compression_type="${compress_type}",
)

for i in range(0, 5):
# 发送消息 TODO 日志主题 ID
future = produce.send(topic="${topicID}", value=b'python sdk sender demo')
result = future.get(timeout=10)
print(result)

Java SDK 调用示例

maven 依赖:
<dependencies>
<!--https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
</dependencies>
代码示例:
import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ProducerDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 0.配置一系列参数
Properties props = new Properties();
// TODO 使用时
props.put("bootstrap.servers", "${region}-producer.cls.tencentyun.com:9095");
// TODO 以下值根据业务场景设置
props.put("acks", ${acks});
props.put("retries", ${retries});
props.put("batch.size", ${batch.size});
props.put("linger.ms", ${linger.ms});
props.put("buffer.memory", ${buffer.memory});
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "${compress_type}"); // TODO 配置压缩方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
// TODO 用户名为 logsetID;密码为 secret_id 和 secret_key 的组合,格式为 ${secret_id}#${secret_key},
// 若要匿名上传,密码为 topic_id#${日志主题 ID}
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${secret_id}#${secret_key}';");

// 1.创建一个生产者对象
Producer<String, String> producer = new KafkaProducer<String, String>(props);

// 2.调用 send 方法 TODO 日志主题 ID
Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicID}", ${message}));
RecordMetadata recordMetadata = meta.get(${timeout}, TimeUnit.MILLISECONDS);
System.out.println("offset = " + recordMetadata.offset());

// 3.关闭生产者
producer.close();
}
}

C SDK 调用示例

// https://github.com/edenhill/librdkafka - master
#include <iostream>
#include <librdkafka/rdkafka.h>
#include <string>
#include <unistd.h>

#define BOOTSTRAP_SERVER "${region}-producer.cls.tencentyun.com:${port}"
// USERNAME 为日志集 ID
#define USERNAME "${logsetID}"
// PASSWORD 格式为 ${secret_id}#${secret_key}, 若要匿名上传,格式为 topic_id#${日志主题 ID}
#define PASSWORD "${secret_id}#${secret_key}"
// 日志主题 ID
#define TOPIC "${topicID}"
#define ACKS "${acks}"
// 配置压缩方式
#define COMPRESS_TYPE "${compress_type}"

static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stdout, "%% Message delivery failed : %s\\n", rd_kafka_err2str(rkmessage->err));
} else {
fprintf(stdout, "%% Message delivery successful %zu:%d\\n", rkmessage->len, rkmessage->partition);
}
}

int main(int argc, char **argv) {
// 1. 初始化配置
rd_kafka_conf_t *conf = rd_kafka_conf_new();

rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

char errstr[512];
if (rd_kafka_conf_set(conf, "bootstrap.servers", BOOTSTRAP_SERVER, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\\n", errstr);
return -1;
}

if (rd_kafka_conf_set(conf, "acks", ACKS, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\\n", errstr);
return -1;
}

if (rd_kafka_conf_set(conf, "compression.codec", COMPRESS_TYPE, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\\n", errstr);
return -1;
}

// 设置认证方式
if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\\n", errstr);
return -1;
}
if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\\n", errstr);
return -1;
}
if (rd_kafka_conf_set(conf, "sasl.username", USERNAME, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\\n", errstr);
return -1;

}
if (rd_kafka_conf_set(conf, "sasl.password", PASSWORD, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\\n", errstr);
return -1;
}

// 2. 创建 handler
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "create produce handler failed: %s\\n", errstr);
return -1;
}

// 3. 发送数据
std::string value = "test lib kafka ---- ";
for (int i = 0; i < 100; ++i) {
retry:
rd_kafka_resp_err_t err = rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(TOPIC),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE((void *) value.c_str(), value.size()),
RD_KAFKA_V_OPAQUE(nullptr), RD_KAFKA_V_END);

if (err) {
fprintf(stdout, "Failed to produce to topic : %s, error : %s", TOPIC, rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
rd_kafka_poll(rk, 1000);
goto retry;
}
} else {
fprintf(stdout, "send message to topic successful : %s\\n", TOPIC);
}

rd_kafka_poll(rk, 0);
}

std::cout << "message flush final" << std::endl;
rd_kafka_flush(rk, 10 * 1000);

if (rd_kafka_outq_len(rk) > 0) {
fprintf(stdout, "%d message were not deliverer\\n", rd_kafka_outq_len(rk));
}

rd_kafka_destroy(rk);

return 0;
}

C# SDK 调用示例

/*
* 该 demo 只提供了最简单的使用方法,具体生产还需要结合调用放来实现
* 在使用过程中,Demo 中留的 todo 项需要替换使用
*
* 注意:
* 1. 该 Demo 基于 Confluent.Kafka/1.8.2版本验证通过
* 2. MessageMaxBytes 最大值不能超过5M
* 3. 该 Demo 使用同步的方式生产,在使用时也可根据业务场景调整为异步的方式
* 4. 其他参数在使用过程中可以根据业务参考文档自己调整:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ProducerConfig.html
*
* Confluent.Kafka 参考文档:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html
*/


using Confluent.Kafka;

namespace Producer
{
class Producer
{
private static void Main(string[] args)
{
var config = new ProducerConfig
{
// TODO 域名参考 https://www.tencentcloud.com/document/product/614/18940?from_cn_redirect=1
// Kafka 填写,注意内网端口9095,公网端口9096
BootstrapServers = "${domain}:${port}",
SaslMechanism = SaslMechanism.Plain,
// TODO topic 所属日志集 ID
SaslUsername = "${logsetID}",
// TODO topic 所属 uin 的密钥,格式为 ${secret_id}#${secret_key},
// 若要匿名上传,格式为 topic_id#${日志主题 ID}
SaslPassword = "${secret_id}#${secret_key}",
SecurityProtocol = SecurityProtocol.SaslPlaintext,
// TODO 根据实际使用场景赋值。可取值: Acks.None、Acks.Leader、Acks.All
Acks = Acks.None,
// TODO 请求消息的最大大小,最大不能超过5M
MessageMaxBytes = 5242880
};

// deliveryHandler
Action<DeliveryReport<Null, string>> handler =
r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}");


using (var produce = new ProducerBuilder<Null, string>(config).Build())
{
try
{
// TODO 测试验证代码
for (var i = 0; i < 100; i++)
{
// TODO 替换日志主题 ID
produce.Produce("${topicID}", new Message<Null, string> { Value = "C# demo value" }, handler);
}
produce.Flush(TimeSpan.FromSeconds(10));

}
catch (ProduceException<Null, string> pe)
{
Console.WriteLine($"send message receiver error : {pe.Error.Reason}");
}
}
}
}
}



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈