tencent cloud


Uploading Log over Kafka

Last updated: 2022-10-13 14:36:07

    CLS allows you to upload logs to CLS by using Kafka Producer SDKs or other Kafka related agents.


    Using Kafka as a message pipeline is common in log applications. First, the open source collection client or the producer on the machine directly writes logs to be collected, and then provides them to the downstream, such as Spark and Flink, for consumption through the Kafka message pipeline. CLS has complete upstream and downstream capabilities of the Kafka message pipeline. The following describes the scenarios suitable for you to upload logs using the Kafka protocol. For more Kafka protocol consumption scenarios, see Kafka Real-time Consumption.

    • Scenario 1: You already have a self-built system based on open source collection and you do not want complex secondary modifications. Then you can upload logs to CLS by modifying configuration files.
      For example, if you have set up a log system using ELK, now you only need to modify the Filebeat or Logstash configuration file to configure the output destination (see Filebeat configuration) to CLS to implement convenient and simple log upload to CLS.
    • Scenario 2: If you want to use Kafka producers to collect and upload logs, you do not need to install collection agents.
      CLS allows you to use various Kafka producer SDKs to collect logs and upload the logs to CLS via the Kafka protocol. For more information, see SDK call examples in this document.

    Use Limits

    • Supported Kafka protocol versions: 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X, 2.7.X, 2.8.X
    • Supported compression modes: Gzip, Snappy, LZ4
    • Current authentication mode: SASL_PLAINTEXT
    • Upload over Kafka requires the RealtimeProducer permission. For more information, see Examples of Custom Access Policies.

    Configuration Methods

    To upload logs via Kafka, you need to set the following parameters:

    Parameter Description
    LinkType Currently, SASL_PLAINTEXT is supported.
    hosts Address of the initially connected cluster. For more information, see Service Entries.
    topic Log topic ID. Example: 76c63473-c496-466b-XXXX-XXXXXXXXXXXX
    username Logset ID. Example: 0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX
    password Password in the format of ${SecurityId}#${SecurityKey}. Example: XXXXXXXXXXXXXX#YYYYYYYY

    Service Entries

    RegionNetwork TypePort NumberService Entry
    GuangzhouPrivate network9095gz-producer.cls.tencentyun.com:9095
    Public network9096gz-producer.cls.tencentcs.com:9096

    This document uses the Guangzhou region as an example. The private and public domain names are identified by different ports. For other regions, replace the address prefixes. For more information, see here.


    Agent call examples

    Filebeat/Winlogbeat configuration

    enabled: true
    hosts: ["${region}-producer.cls.tencentyun.com:9095"] # TODO: Service address. The public network port is 9096, and the private network port is 9095.
    topic: "${topicID}" #  TODO: Topic ID
    version: ""
    compression: "${compress}"   # Configure the compression method. Valid values: `gzip`, `snappy`, `lz4`.
    username: "${logsetID}"
    password: "${SecurityId}#${SecurityKey}"

    Logstash example

    output {
    kafka {
      topic_id => "${topicID}"
      bootstrap_servers => "${region}-producer.cls.tencentyun.com:${port}"
      sasl_mechanism => "PLAIN"
      security_protocol => "SASL_PLAINTEXT"
      compression_type => "${compress}"
      sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${securityID}#${securityKEY}';"

    SDK call examples

    Golang SDK call example

    import (
    func main(){
      config := sarama.NewConfig()
       config.Net.SASL.Mechanism = "PLAIN"
      config.Net.SASL.Version = int16(1)
      config.Net.SASL.Enable = true
      config.Net.SASL.User = "${logsetID}"                        // TODO: Logset ID
      config.Net.SASL.Password = "${SecurityId}#${SecurityKey}"   // TODO: Format: ${SecurityId}#${SecurityKey}
      config.Producer.Return.Successes = true
      config.Producer.RequiredAcks = ${acks}                      // TODO: Select the acks value according to the use case
      config.Version = sarama.V1_1_0_0
      config.Producer.Compression = ${compress}                   // TODO: Configuration compression mode
       // TODO: Service address. The public network port is 9096, and the private network port is 9095.
      producer, err := sarama.NewSyncProducer([]string{"${region}-producer.cls.tencentyun.com:9095"}, config)
      if err != nil{
       msg := &sarama.ProducerMessage{
          Topic: "${topicID}", // TODO: Topic ID
          Value: sarama.StringEncoder("goland sdk sender demo"),
      // Send the messages
      for i := 0; i <= 5; i++ {
          partition, offset, err := producer.SendMessage(msg)
          if err != nil{
          fmt.Printf("send response; partition:%d, offset:%d\n", partition, offset)
       _ = producer.Close()

    Python SDK call example

    from kafka import KafkaProducer
    if __name__ == '__main__':
      produce = KafkaProducer(
          # TODO: Service address. The public network port is 9096, and the private network port is 9095.
          # TODO: Logset ID
          # TODO: Format: ${SecurityId}#${SecurityKey}
          api_version=(0, 11, 0),
          # TODO: Configuration compression mode
       for i in range(0, 5):
          # TODO: Topic ID of the sent message
          future = produce.send(topic="${topicID}", value=b'python sdk sender demo')
          result = future.get(timeout=10)

    Java SDK call example

    Maven dependencies:


    Sample code:

    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    public class ProducerDemo {
       public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
           // 0. Set parameters
           Properties props = new Properties();
           // TODO: In use
           props.put("bootstrap.servers", "${region}-producer.cls.tencentyun.com:9095");
           // TODO: Set the following according to the actual business scenario 
           props.put("acks", ${acks});
           props.put("retries", ${retries});
           props.put("batch.size", ${batch.size});
           props.put("linger.ms", ${linger.ms});
           props.put("buffer.memory", ${buffer.memory});
           props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "${compress_type}"); // TODO: configuration compression mode
           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("security.protocol", "SASL_PLAINTEXT");
           props.put("sasl.mechanism", "PLAIN");
           // TODO: The user name is logsetID, and the password is the combination of securityID and securityKEY: securityID#securityKEY.
                   "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecurityId}#${SecurityKey}';");
            // 1. Create a producer object.
           Producer<String, String> producer = new KafkaProducer<String, String>(props);
           // 2. Call the send method.
           Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicID}", ${message}));
           RecordMetadata recordMetadata = meta.get(${timeout}, TimeUnit.MILLISECONDS);
           System.out.println("offset = " + recordMetadata.offset());
            // 3. Close the producer.

    SDK for C call example

    // https://github.com/edenhill/librdkafka - master
    #include <iostream>
    #include <librdkafka/rdkafka.h>
    #include <string>
    #include <unistd.h>
    #define BOOTSTRAP_SERVER "${region}-producer.cls.tencentyun.com:${port}"
    #define USERNAME "${logsetID}"
    #define PASSWORD "${SecurityId}#${SecurityKey}"
    #define TOPIC "${topicID}"
    #define ACKS "${acks}"
    #define COMPRESS_TYPE "${compress_type}"
    static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
      if (rkmessage->err) {
          fprintf(stdout, "%% Message delivery failed : %s\n", rd_kafka_err2str(rkmessage->err));
      } else {
          fprintf(stdout, "%% Message delivery successful %zu:%d\n", rkmessage->len, rkmessage->partition);
    int main(int argc, char **argv) {
      // 1. Initialize the configuration.
      rd_kafka_conf_t *conf = rd_kafka_conf_new();
       rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
       char errstr[512];
      if (rd_kafka_conf_set(conf, "bootstrap.servers", BOOTSTRAP_SERVER, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
       if (rd_kafka_conf_set(conf, "acks", ACKS, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
       if (rd_kafka_conf_set(conf, "compression.codec", COMPRESS_TYPE, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
       // Set the authentication method.
      if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
      if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
      if (rd_kafka_conf_set(conf, "sasl.username", USERNAME, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
      if (rd_kafka_conf_set(conf, "sasl.password", PASSWORD, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
       // 2. Create a handler.
      rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
      if (!rk) {
          fprintf(stdout, "create produce handler failed: %s\n", errstr);
          return -1;
       // 3. Send data.
      std::string value = "test lib kafka ---- ";
      for (int i = 0; i < 100; ++i) {
          rd_kafka_resp_err_t err = rd_kafka_producev(
                  rk, RD_KAFKA_V_TOPIC(TOPIC),
                  RD_KAFKA_V_VALUE((void *) value.c_str(), value.size()),
                  RD_KAFKA_V_OPAQUE(nullptr), RD_KAFKA_V_END);
           if (err) {
              fprintf(stdout, "Failed to produce to topic : %s, error : %s", TOPIC, rd_kafka_err2str(err));
              if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                  rd_kafka_poll(rk, 1000);
                  goto retry;
          } else {
              fprintf(stdout, "send message to topic successful : %s\n", TOPIC);
           rd_kafka_poll(rk, 0);
       std::cout << "message flush final" << std::endl;
      rd_kafka_flush(rk, 10 * 1000);
       if (rd_kafka_outq_len(rk) > 0) {
          fprintf(stdout, "%d message were not deliverer\n", rd_kafka_outq_len(rk));
       return 0;

    SDK for C# call example

    * This demo only provides the easiest way of using the feature. The specific production needs to be implemented in combination with the call method.
    * During use, the TODO items in the demo need to be replaced with actual values.
    * Notes:
    *  1. This demo is verified based on Confluent.Kafka 1.8.2.
    *  2. The maximum value of `MessageMaxBytes` cannot exceed 5 MB.
    *  3. This demo adopts the sync mode for production. You can change to the async mode during use based on your business scenario.
    *  4. You can adjust other parameters during use as instructed at https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ProducerConfig.html.
    * Confluent.Kafka reference: https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html
    using Confluent.Kafka;
    namespace Producer
      class Producer
          private static void Main(string[] args)
              var config = new ProducerConfig
                  // TODO: Domain name. For more information, visit https://www.tencentcloud.com/document/product/614/18940. The private network port is 9095, and the public network port is 9096.
                  BootstrapServers = "${domain}:${port}", 
                  SaslMechanism = SaslMechanism.Plain,
                  SaslUsername = "${logsetID}", // TODO: Logset ID of the topic
                  SaslPassword = "${SecurityId}#${SecurityKey}", // TODO: UIN key of the topic
                  SecurityProtocol = SecurityProtocol.SaslPlaintext,
                  Acks         = Acks.None, // TODO: Assign a value based on the actual use case. Valid values: `Acks.None`, `Acks.Leader`, `Acks.All`.
                  MessageMaxBytes = 5242880 // TODO: The maximum size of the request message, which cannot exceed 5 MB.
               // deliveryHandler
              Action<DeliveryReport<Null, string>> handler =
                  r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}");
                using (var produce = new ProducerBuilder<Null, string>(config).Build())
                      // TODO: Test verification code
                      for (var i = 0; i < 100; i++)
                          // TODO: Replace the log topic ID
                          produce.Produce("${topicID}", new Message<Null, string> { Value = "C# demo value" }, handler);
                  catch (ProduceException<Null, string> pe)
                      Console.WriteLine($"send message receiver error : {pe.Error.Reason}");
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support