tencent cloud

文档反馈

使用 Kafka 协议上传日志

最后更新时间:2024-01-20 17:14:28
    日志服务(Cloud Log Service,CLS)目前已支持使用 Kafka Producer SDK 和其他 Kafka 相关 agent 上传日志到 CLS。

    使用场景

    日志应用中使用 Kafka 作为消息管道是非常普遍的场景。先通过机器上的开源采集客户端或者使用 producer 直接写入收集日志,再通过消费管道提供给下游如 spark、flink 等进行消费。CLS 具备完整的 Kafka 数据管道上下行能力,以下主要介绍哪些场景适合您使用 Kafka 协议上传日志,更多 Kafka 协议消费场景请参考 Kafka 协议实时消费
    场景1:您已有基于开源采集的自建系统,不希望有复杂的二次改造,您可以通过修改配置文件将日志上传到 CLS。 例如,您之前使用 ELK 搭建日志系统的客户,现在只需要通过修改 Filebeat 或者 Logstash 的配置文件,将 Output 配置(详情请参考 filebeat 配置)到 CLS,即可非常方便简洁的将日志上传。
    场景2:您希望通过 Kafka producer 来采集日志并上传,不必再安装采集 Agent。 CLS 支持您使用各类 Kafka producer SDK 采集日志,并通过 Kafka 协议上传到 CLS。(详情请参考本文提供的 SDK 调用示例

    相关限制

    支持 Kafka 协议版本为:0.11.0.X,1.0.X,1.1.X,2.0.X,2.1.X,2.2.X,2.3.X,2.4.X,2.5.X,2.6.X,2.7.X,2.8.X。
    支持压缩方式:gzip,snappy,lz4。
    当前使用 SASL_PLAINTEXT 认证。
    使用 Kafka 协议上传需要配置 RealtimeProducer 权限,详情请参考 CLS 访问策略模板

    配置方式

    使用 kafka 协议上传日志时,需要配置一下参数:
    参数
    说明
    链接类型
    当前支持 SASL_PLAINTEXT
    hosts
    初始连接的集群地址,详细参见 服务入口
    topic
    配置为日志主题 ID。例如:76c63473-c496-466b-XXXX-XXXXXXXXXXXX
    username
    配置为日志集 ID。 例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX
    password
    格式为 ${SecurityId}#${SecurityKey}。例如:XXXXXXXXXXXXXX#YYYYYYYY

    服务入口

    地域
    网络类型
    端口号
    服务入口
    广州
    内网
    9095
    gz-producer.cls.tencentyun.com:9095
    外网
    9096
    gz-producer.cls.tencentcs.com:9096
    注意:
    本文档以广州地域为例,内外网域名需用不同端口标识,其他地域请替换地址前缀。详情请参考 可用域名-Kafka上传日志

    示例

    Agent 调用示例

    filebeat/winlogbeat 配置

    output.kafka:
    enabled: true
    hosts: ["${region}-producer.cls.tencentyun.com:9095"] # TODO 服务地址;外网端口9096,内网端口9095
    topic: "${topicID}" # TODO topicID
    version: "0.11.0.2"
    compression: "${compress}" # 配置压缩方式,支持gzip,snappy,lz4;例如"lz4"
    username: "${logsetID}"
    password: "${SecurityId}#${SecurityKey}"

    logstash 示例

    output {
    kafka {
    topic_id => "${topicID}"
    bootstrap_servers => "${region}-producer.cls.tencentyun.com:${port}"
    sasl_mechanism => "PLAIN"
    security_protocol => "SASL_PLAINTEXT"
    compression_type => "${compress}"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${securityID}#${securityKEY}';"
    }
    }

    SDK 调用示例

    Golang SDK 调用示例

    import (
    "fmt"
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    
    config.Net.SASL.Mechanism = "PLAIN"
    config.Net.SASL.Version = int16(1)
    config.Net.SASL.Enable = true
    config.Net.SASL.User = "${logsetID}" // TODO 日志集 ID
    config.Net.SASL.Password = "${SecurityId}#${SecurityKey}" // TODO 格式为 ${SecurityId}#${SecurityKey}
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = ${acks} // TODO 根据使用场景选择acks的值
    config.Version = sarama.V1_1_0_0
    config.Producer.Compression = ${compress} // TODO 配置压缩方式
    
    // TODO 服务地址;外网端口9096,内网端口9095
    producer, err := sarama.NewSyncProducer([]string{"${region}-producer.cls.tencentyun.com:9095"}, config)
    if err != nil {
    panic(err)
    }
    
    msg := &sarama.ProducerMessage{
    Topic: "${topicID}", // TODO topicID
    Value: sarama.StringEncoder("goland sdk sender demo"),
    }
    // 发送消息
    for i := 0; i <= 5; i++ {
    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
    panic(err)
    }
    fmt.Printf("send response; partition:%d, offset:%d\\n", partition, offset)
    }
    
    _ = producer.Close()
    
    }

    Python SDK 调用示例

    from kafka import KafkaProducer
    
    if __name__ == '__main__':
    produce = KafkaProducer(
    # TODO 服务地址;外网端口9096,内网端口9095
    bootstrap_servers=["${region}-producer.cls.tencentyun.com:9095"],
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='PLAIN',
    # TODO 日志集 ID
    sasl_plain_username='${logsetID}',
    # TODO 格式为 ${SecurityId}#${SecurityKey}
    sasl_plain_password='${SecurityId}#${SecurityKey}',
    api_version=(0, 11, 0),
    # TODO 配置压缩方式
    compression_type="${compress_type}",
    )
    
    for i in range(0, 5):
    # 发送消息 TODO topicID
    future = produce.send(topic="${topicID}", value=b'python sdk sender demo')
    result = future.get(timeout=10)
    print(result)

    Java SDK 调用示例

    maven 依赖:
    <dependencies>
    <!--https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients-->
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.2</version>
    </dependency>
    </dependencies>
    代码示例:
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class ProducerDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
    // 0.配置一系列参数
    Properties props = new Properties();
    // TODO 使用时
    props.put("bootstrap.servers", "${region}-producer.cls.tencentyun.com:9095");
    // TODO 以下值根据业务场景设置
    props.put("acks", ${acks});
    props.put("retries", ${retries});
    props.put("batch.size", ${batch.size});
    props.put("linger.ms", ${linger.ms});
    props.put("buffer.memory", ${buffer.memory});
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "${compress_type}"); // TODO 配置压缩方式
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    // TODO 用户名为logsetID;密码为securityID和securityKEY的组合 securityID#securityKEY
    props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecurityId}#${SecurityKey}';");
    
    // 1.创建一个生产者对象
    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    
    // 2.调用send方法
    Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicID}", ${message}));
    RecordMetadata recordMetadata = meta.get(${timeout}, TimeUnit.MILLISECONDS);
    System.out.println("offset = " + recordMetadata.offset());
    
    // 3.关闭生产者
    producer.close();
    }
    }

    C SDK 调用示例

    // https://github.com/edenhill/librdkafka - master
    #include <iostream>
    #include <librdkafka/rdkafka.h>
    #include <string>
    #include <unistd.h>
    
    #define BOOTSTRAP_SERVER "${region}-producer.cls.tencentyun.com:${port}"
    #define USERNAME "${logsetID}"
    #define PASSWORD "${SecurityId}#${SecurityKey}"
    #define TOPIC "${topicID}"
    #define ACKS "${acks}"
    #define COMPRESS_TYPE "${compress_type}"
    
    static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
    fprintf(stdout, "%% Message delivery failed : %s\\n", rd_kafka_err2str(rkmessage->err));
    } else {
    fprintf(stdout, "%% Message delivery successful %zu:%d\\n", rkmessage->len, rkmessage->partition);
    }
    }
    
    int main(int argc, char **argv) {
    // 1. 初始化配置
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
    
    char errstr[512];
    if (rd_kafka_conf_set(conf, "bootstrap.servers", BOOTSTRAP_SERVER, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    rd_kafka_conf_destroy(conf);
    fprintf(stdout, "%s\\n", errstr);
    return -1;
    }
    
    if (rd_kafka_conf_set(conf, "acks", ACKS, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    rd_kafka_conf_destroy(conf);
    fprintf(stdout, "%s\\n", errstr);
    return -1;
    }
    
    if (rd_kafka_conf_set(conf, "compression.codec", COMPRESS_TYPE, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    rd_kafka_conf_destroy(conf);
    fprintf(stdout, "%s\\n", errstr);
    return -1;
    }
    
    // 设置认证方式
    if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    rd_kafka_conf_destroy(conf);
    fprintf(stdout, "%s\\n", errstr);
    return -1;
    }
    if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    rd_kafka_conf_destroy(conf);
    fprintf(stdout, "%s\\n", errstr);
    return -1;
    }
    if (rd_kafka_conf_set(conf, "sasl.username", USERNAME, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    rd_kafka_conf_destroy(conf);
    fprintf(stdout, "%s\\n", errstr);
    return -1;
    
    }
    if (rd_kafka_conf_set(conf, "sasl.password", PASSWORD, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    rd_kafka_conf_destroy(conf);
    fprintf(stdout, "%s\\n", errstr);
    return -1;
    }
    
    // 2. 创建 handler
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) {
    rd_kafka_conf_destroy(conf);
    fprintf(stdout, "create produce handler failed: %s\\n", errstr);
    return -1;
    }
    
    // 3. 发送数据
    std::string value = "test lib kafka ---- ";
    for (int i = 0; i < 100; ++i) {
    retry:
    rd_kafka_resp_err_t err = rd_kafka_producev(
    rk, RD_KAFKA_V_TOPIC(TOPIC),
    RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
    RD_KAFKA_V_VALUE((void *) value.c_str(), value.size()),
    RD_KAFKA_V_OPAQUE(nullptr), RD_KAFKA_V_END);
    
    if (err) {
    fprintf(stdout, "Failed to produce to topic : %s, error : %s", TOPIC, rd_kafka_err2str(err));
    if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
    rd_kafka_poll(rk, 1000);
    goto retry;
    }
    } else {
    fprintf(stdout, "send message to topic successful : %s\\n", TOPIC);
    }
    
    rd_kafka_poll(rk, 0);
    }
    
    std::cout << "message flush final" << std::endl;
    rd_kafka_flush(rk, 10 * 1000);
    
    if (rd_kafka_outq_len(rk) > 0) {
    fprintf(stdout, "%d message were not deliverer\\n", rd_kafka_outq_len(rk));
    }
    
    rd_kafka_destroy(rk);
    
    return 0;
    }
    

    C# SDK 调用示例

    /*
    * 该demo只提供了最简单的使用方法,具体生产还需要结合调用放来实现
    * 在使用过程中,demo中留的todo项需要替换使用
    *
    * 注意:
    * 1. 该Demo基于Confluent.Kafka/1.8.2版本验证通过
    * 2. MessageMaxBytes最大值不能超过5M
    * 3. 该demo使用同步的方式生产,在使用时也可根据业务场景调整为异步的方式
    * 4. 其他参数在使用过程中可以根据业务参考文档自己调整:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ProducerConfig.html
    *
    * Confluent.Kafka 参考文档:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html
    */
    
    
    using Confluent.Kafka;
    
    namespace Producer
    {
    class Producer
    {
    private static void Main(string[] args)
    {
    var config = new ProducerConfig
    {
    // todo 域名参考 https://www.tencentcloud.com/document/product/614/18940 填写,注意内网端口9095,公网端口9096
    BootstrapServers = "${domain}:${port}",
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = "${logsetID}", // todo topic所属日志集ID
    SaslPassword = "${SecurityId}#${SecurityKey}", // todo topic所属uin的密钥
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    Acks = Acks.None, // todo 根据实际使用场景赋值。可取值: Acks.None、Acks.Leader、Acks.All
    MessageMaxBytes = 5242880 // todo 请求消息的最大 大小,最大不能超过5M
    };
    
    // deliveryHandler
    Action<DeliveryReport<Null, string>> handler =
    r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}");
    
    
    using (var produce = new ProducerBuilder<Null, string>(config).Build())
    {
    try
    {
    // todo 测试验证代码
    for (var i = 0; i < 100; i++)
    {
    // todo 替换日志主题ID
    produce.Produce("${topicID}", new Message<Null, string> { Value = "C# demo value" }, handler);
    }
    produce.Flush(TimeSpan.FromSeconds(10));
    
    }
    catch (ProduceException<Null, string> pe)
    {
    Console.WriteLine($"send message receiver error : {pe.Error.Reason}");
    }
    }
    }
    }
    }
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持