tencent cloud

Consumption Demo - multi-language SDK
Last updated: 2025-12-03 18:30:48
Consumption Demo - multi-language SDK
Last updated: 2025-12-03 18:30:48
Introduce using Python, Java consumption CLS logs.

Python SDK

Note:
It is recommended to use Python version 3.9 or higher.
Python Kafka client: kafka-python, kafka-python-ng, confluent-kafka-python.
If you configured the data compression format: SNAPPY, confirm installation of the python-snappy package; LZ4, confirm installation of the lz4 package.

Single Consumer

import uuid
from kafka import KafkaConsumer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
# Topic name provided by the cls kafka protocol consumption console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console
'Your consumption topics',
group_id = 'your consumer group name',
auto_offset_reset='earliest',
# Service address + port, public network port 9096, private network port 9095, example is intranet consumption, please fill in according to your actual situation
bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
# username is logset ID, such as ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
sasl_plain_username = "${logsetID}",
The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. Use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. The actions and resources in the sub-account access policy should be configured to the minimum range to fulfill the operations.
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (0,10,1)
)
print('begin')
for message in consumer:
print('begins')
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
print('end')

Multiple Consumers

from kafka import KafkaConsumer
import threading

TOPIC_NAME = 'Your consumption topics'
GROUP_ID = 'your consumer group name'
# Service address + port, public network port 9096, private network port 9095, example is intranet consumption, please fill in according to your actual situation
BOOTSTRAP_SERVERS = ''kafkaconsumer-${region}.cls.tencentyun.com:9095''

def consume_messages(thread_id):
# Create a Kafka consumer instance
consumer = KafkaConsumer(
TOPIC_NAME,
group_id=GROUP_ID,
bootstrap_servers=BOOTSTRAP_SERVERS,
value_deserializer=lambda m: m.decode('utf-8'),
auto_offset_reset='earliest',
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "${logsetID}"",
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (2, 5, 1)
)

try:
for message in consumer:
print(f"Thread {thread_id}: partition = {message.partition}, offset = {message.offset}, value = {message.value}")
except KeyboardInterrupt:
pass
finally:
# Stop the consumer.
consumer.close()

if __name__ == "__main__":
Start 3 consumer threads. This is an example. Please configure according to actual conditions.
num_consumers = 3
threads = []
for i in range(num_consumers):
thread = threading.Thread(target=consume_messages, args=(i,))
threads.append(thread)
thread.start()

# Wait for all threads to complete
for thread in threads:
thread.join()

Java SDK

Note:
The Java code in the following example shows the configuration of jaas.config: ${SecretId}#${SecretKey} is followed by (; semicolon). Do not leave any field out, otherwise an error will be reported.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerGroupTest {

public static void consume() {
Properties props = new Properties();
String logset_id = "${logsetID}";
// Topic name for display on the page of kafka protocol consumption in the CLS console
String topic_id = "Your consumption topics";

String accessKeyID = System.getenv("${SecretId}");
String accessKeySecret = System.getenv("${SecretKey}");

String groupId = "your consumer group name";

// Service address + port, public network port 9096, private network port 9095, example is intranet consumption, please fill in according to your actual situation
String hosts = "kafkaconsumer-${region}.cls.tencentyun.com:9095";
props.put("bootstrap.servers", hosts);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"" +
logset_id + "\\" password=\\"" + accessKeyID + "#" + accessKeySecret + "\\";");

// Kafka consumer configuration
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("session.timeout.ms", "10000");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.interval.ms", "120000");
props.put("heartbeat.interval.ms", "3000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Create a Kafka consumer instance
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic_id));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
public static void main(String[] args){
consume();
}
}

Go SDK

Note:
The Java code in the following example shows the configuration in jaas.config: ${SecretId}#${SecretKey} is followed by (; semicolon). Do not leave any field out, otherwise an error will be reported.
package main

import (
"context"
"fmt"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"syscall"
)

func main() {
// Create Sarama consumer configuration
//TOPIC_NAME is your consumption topic, view it in the console.
topicName := "${TOPIC_NAME}"
//GROUP_ID is your consumer group name
groupID := "${GROUP_ID}"
//BOOTSTRAP_SERVERS is the consumption service host address and port, public network port 9096, private network port 9095, such as kafkaconsumer-${region}.cls.tencentyun.com:9095
endpoint := "${BOOTSTRAP_SERVERS"
config := sarama.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.User = "${logsetID}"
config.Net.SASL.Password = "${SecretId}#${SecretKey}"
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Version = sarama.V1_1_1_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest

// Create Sarama consumer
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
consumer, err := sarama.NewConsumerGroup([]string{endpoint}, groupID, config)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

// Process received messages
handler := &ConsumerGroupHandler{}
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

go func() {
for {
err := consumer.Consume(context.Background(), []string{topicName}, handler)
if err != nil {
log.Fatal(err)
}
if handler.ready {
break
}
}
}()

<-signals
fmt.Println("Exiting...")
}

// ConsumerGroupHandler implements the sarama.ConsumerGroupHandler API
type ConsumerGroupHandler struct {
ready bool
}

// Setup is called before the consumer group starts up
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
h.ready = true
return nil
}

// Cleanup is called after the consumer group stops
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
h.ready = false
return nil
}

// ConsumeClaim consumes messages from the Claim
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Printf("Received message: %s\\n", string(message.Value))
session.MarkMessage(message, "")
}
return nil
}

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback