tencent cloud

日志服务

动态与公告
产品动态
公告
新手指引
产品简介
产品概述
产品优势
地域和访问域名
规格与限制
基本概念
购买指南
计费概述
产品定价
按量计费(后付费)
欠费说明
清理日志服务资源
成本优化
常见问题
快速入门
一分钟入门指南
入门指南
使用 Demo 日志快速体验 CLS
操作指南
资源管理
权限管理
日志采集
指标采集
日志存储
指标存储
检索分析(日志主题)
检索分析(指标主题)
仪表盘
数据处理
投递与消费
监控告警
云产品中心
DataSight 独立控制台
历史文档
实践教程
日志采集
检索分析
仪表盘
监控告警
投递和消费
成本优化
开发者指南
通过 iframe 内嵌 CLS(旧方案)
通过 Grafana 使用 CLS
API 文档
History
Introduction
API Category
Making API Requests
Topic Management APIs
Log Set Management APIs
Index APIs
Topic Partition APIs
Machine Group APIs
Collection Configuration APIs
Log APIs
Metric APIs
Alarm Policy APIs
Data Processing APIs
Kafka Protocol Consumption APIs
CKafka Shipping Task APIs
Kafka Data Subscription APIs
COS Shipping Task APIs
SCF Delivery Task APIs
Scheduled SQL Analysis APIs
COS Data Import Task APIs
Data Types
Error Codes
常见问题
健康监测问题解释
采集相关
检索分析相关
其他问题
服务等级协议
CLS 政策
隐私协议
数据处理和安全协议
联系我们
词汇表

消费 Demo-多语言 SDK

PDF
聚焦模式
字号
最后更新时间: 2025-12-03 18:30:49
本文介绍使用 Python、Java 消费 CLS 日志。

Python SDK

说明:
推荐您使用 Python 版本:3.9及以上。
Python kafka 客户端:kafka-python、kafka-python-ng、confluent-kafka-python。
如果您配置了数据压缩格式:格式 SNAPPY,请确认安装了 python-snappy 包;格式 LZ4,确认安装了 lz4 包。

单个消费者

import uuid
from kafka import KafkaConsumer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'您的消费主题',
group_id = '您的消费组名称',
auto_offset_reset='earliest',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
# 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
sasl_plain_username = "${logsetID}",
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (0,10,1)
)
print('begin')
for message in consumer:
print('begins')
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
print('end')

多个消费者

from kafka import KafkaConsumer
import threading

TOPIC_NAME = '您的消费主题'
GROUP_ID = '您的消费组名称'
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
BOOTSTRAP_SERVERS = ''kafkaconsumer-${region}.cls.tencentyun.com:9095''

def consume_messages(thread_id):
# 创建 Kafka 消费者实例
consumer = KafkaConsumer(
TOPIC_NAME,
group_id=GROUP_ID,
bootstrap_servers=BOOTSTRAP_SERVERS,
value_deserializer=lambda m: m.decode('utf-8'),
auto_offset_reset='earliest',
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "${logsetID}",
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (2, 5, 1)
)

try:
for message in consumer:
print(f"Thread {thread_id}: partition = {message.partition}, offset = {message.offset}, value = {message.value}")
except KeyboardInterrupt:
pass
finally:
# 关闭消费者
consumer.close()

if __name__ == "__main__":
# 启动3个消费者线程,这是个例子,请您根据实际情况配置
num_consumers = 3
threads = []
for i in range(num_consumers):
thread = threading.Thread(target=consume_messages, args=(i,))
threads.append(thread)
thread.start()

# 等待所有线程结束
for thread in threads:
thread.join()

Java SDK

注意:
下面的例子中的 Java 代码,在 jaas.config 的配置中,${SecretId}#${SecretKey} 后有(; 分号),不要漏填,否则会报错。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerGroupTest {

public static void consume() {
Properties props = new Properties();
String logset_id = "${logsetID}";
// CLS 控制台 kafka 协议消费页面展示的主题名称
String topic_id = "您的消费主题";

String accessKeyID = System.getenv("${SecretId}");
String accessKeySecret = System.getenv("${SecretKey}");

String groupId = "您的消费组名称";

// 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
String hosts = "kafkaconsumer-${region}.cls.tencentyun.com:9095";
props.put("bootstrap.servers", hosts);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"" +
logset_id + "\\" password=\\"" + accessKeyID + "#" + accessKeySecret + "\\";");

// Kafka 消费者配置
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("session.timeout.ms", "10000");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.interval.ms", "120000");
props.put("heartbeat.interval.ms", "3000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 创建 Kafka 消费者实例
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic_id));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
public static void main(String[] args){
consume();
}
}

Go SDK

注意:
下面的例子中的 Java 代码,在 jaas.config 的配置中,${SecretId}#${SecretKey} 后有(; 分号),不要漏填,否则会报错。
package main

import (
"context"
"fmt"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"syscall"
)

func main() {
// 创建Sarama消费者配置
//TOPIC_NAME是您的消费主题,在控制台查看.
topicName := "${TOPIC_NAME}"
//GROUP_ID 是 您的消费组名称
groupID := "${GROUP_ID}"
//BOOTSTRAP_SERVERS 是消费服务地址+端口,外网端口9096,内网端口9095,例如 kafkaconsumer-${region}.cls.tencentyun.com:9095
endpoint := "${BOOTSTRAP_SERVERS"
config := sarama.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.User = "${logsetID}"
config.Net.SASL.Password = "${SecretId}#${SecretKey}"
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Version = sarama.V1_1_1_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest

// 创建Sarama消费者
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
consumer, err := sarama.NewConsumerGroup([]string{endpoint}, groupID, config)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

// 处理接收到的消息
handler := &ConsumerGroupHandler{}
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

go func() {
for {
err := consumer.Consume(context.Background(), []string{topicName}, handler)
if err != nil {
log.Fatal(err)
}
if handler.ready {
break
}
}
}()

<-signals
fmt.Println("Exiting...")
}

// ConsumerGroupHandler 实现了sarama.ConsumerGroupHandler接口
type ConsumerGroupHandler struct {
ready bool
}

// Setup 在消费者组启动之前调用
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
h.ready = true
return nil
}

// Cleanup 在消费者组停止之后调用
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
h.ready = false
return nil
}

// ConsumeClaim 消费Claim中的消息
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Printf("Received message: %s\\n", string(message.Value))
session.MarkMessage(message, "")
}
return nil
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈