tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

Confluent Go SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:59:40

背景

TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它具备高吞吐量、低延迟、可伸缩性和容错性等特性。
Sarama:Shopify 开发的一个 Kafka 库,提供了生产者、消费者、分区消费者等功能。该库的性能较好,社区支持也较为活跃。
Confluent-Kafka-Go:由 Confluent 开发的 Kafka 库,提供了高级 API,易于使用。该库基于 librdkafka C 库,性能非常优秀,但安装和使用略显复杂。
本文着重介绍上述 Confluent Go 客户端的关键参数、实践教程以及常见问题。

生产者实践

版本选择

在使用 Confluent Go SDK 时,可以通过配置参数 "bootstrap.servers" 来指定 Kafka 集群的地址,而 Broker 的版本则可以通过"api.version.request"参数来设置,这样 Confluent Go SDK 会在启动时自动检测 Broker 的版本。
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost",
"api.version.request": true,
}

生产者参数与调优

生产者参数

Confluent Go 是基于 librdkafka 开发的,在使用 Confluent Go 客户端写入 Kafka 的时候,需要配置的参数会透传 librdkafka,主要涉及如下关键参数,相关的参数和默认值如下:

package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "-1", //ack方式,默认值为-1
"client.id": "rdkafka", //客户端ID
"compression.type": "none", //指定压缩方式
"compression.level": "-1", //压缩等级
"batch.num.messages": "10000", //默认一个批次最多聚合10000条消息,构成MessageSet整批发送,提高性能
"batch.size": "1000000", //构成MessageSet整批大小限制,默认限制最多不超过1000000字节
"queue.buffering.max.ms": "5", //在构造消息批次(MessageSets)传输到Broker之前,默认延迟5ms攒批消息
"queue.buffering.max.messages": "100000", //Producer攒批发送中,总的消息数不能超过100000
"queue.buffering.max.kbytes": "1048576", //Producer攒批发送中,MessageSets
"message.send.max.retries": "2147483647", //重试次数,默认2147483647
"retry.backoff.ms": "100", //重试间隔时间,默认100ms
"socket.timeout.ms": "60000", //会话超时时间,默认60s
"max.in.flight.requests.per.connection": "1000000", //单个连接最多发送1000000
"max.in.flight": "1000000" //单个连接,该值是max.in.flight.requests.per.connection的alias,最多发送1000000
"socket.connection.setup.timeout.ms": "5000", // socket.connection.setup.timeout.ms 控制连接初始化超时时间,单位为毫秒,librdkafka 1.9以上版本可配置。
}

producer, err := kafka.NewProducer(config)
if err != nil {
panic(fmt.Sprintf("Failed to create producer: %s", err))
}

// 使用producer发送消息等操作...

// 关闭producer
producer.Close()
}

参数说明调优

关于 max.in.flight.requests.per.connection 参数优化
Confluent Go 是基于 librdkafka 开发的,其中 max.in.flight.requests.per.connection 在 librdkafka 中定义为单个连接能够并发发送请求的数量,默认值为1000000,max.in.flight 参数是 max.in.flight.requests.per.connection 的 alias 参数,两者表示意义一样。这个参数在标准的 JAVA SDK 里定义为 max.in.flight.requests.per.connection,默认值为5。该值过大容易造成服务端压力,从而引发稳定性问题,因此建议 Confluent Go 的 SDK 值与开源 Java 的 SDK 保持一致,默认值设置5。
max.in.flight.requests.per.connection:5
max.in.flight:5
关于 acks 参数优化
acks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为-1,表示消息发送给 Leader Broker 后,Leader 确认以及相应的 Follower 消息都写入完成后才返回。acks 参数还有以下可选值:0,1,-1。在跨可用区场景,以及副本数较多的 Topic,acks 参数的取值会影响消息的可靠性和吞吐量。
在一些在线业务消息的场景下,吞吐量要求不大,可以将 acks 参数设置为-1,确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性。
在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 acks 设置为1,兼顾性能同时提高吞吐量。
关于 buffering 参数优化(缓存)
默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。
因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。对于 Confluent kafka Go,默认提供5ms的攒批时间积攒消息。如果消息较小,可以适当增加queue.buffering.max.ms的时间。
关于压缩参数优化
Confluent Go 支持如下压缩参数:none, gzip, snappy, lz4, zstd。
在 Confluent Kafka Go 客户端中,支持以下几种压缩算法:
none:不使用压缩算法。
gzip:使用 GZIP 压缩算法。
snappy:使用 Snappy 压缩算法。
lz4:使用 LZ4 压缩算法。
zstd:使用 ZSTD 压缩算法。
要在 Producer 客户端中使用压缩算法,需要在创建生产者时设置 compression.type 参数。例如,要使用LZ4压缩算法,可以将 compression.type 设置为 lz4,虽然压缩算法的 CPU 压缩,和 CPU 解压缩,发生在客户端,是一种用计算换带宽的优化方式,但是由于 Broker 针对压缩消息存在校验行为会付出额外的计算成本,尤其是 Gzip 压缩,服务端的压缩计算成本会比较大,在某种程度上可能会出现得不偿失的情况,反而因为计算的增加导致 Broker 消息处理能力偏低,导致带宽吞吐更低。这种情况建议可以使用如下方式进行使用:
1. 在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
{"Compression","CompressionLZ4"}
2. 在 Producer 端将 messageCompression 当成正常消息发送。
3. 在 Consumer 端读取消息 key,获取使用的压缩方式,独立进行解压缩。

创建生产者实例

如果应用程序需要更高的吞吐量,则可以使用异步生产者,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。如果应用程序需要更高的可靠性,则可以使用同步生产者,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。

package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// 配置Kafka Producer
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "1",
"compression.type": "none",
"batch.num.messages": "1000",
"max.in.flight.requests.per.connection":"5",
"max.in.flight":"5"
})
if err != nil {
fmt.Printf("Failed to create producer: %s\\n", err)
return
}

// 发送消息
for i := 0; i < 10; i++ {
topic := "test-topic"
value := fmt.Sprintf("hello world %d", i)
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}
p.Produce(message, nil)
}

// 关闭Kafka Producer
p.Flush(15 * 1000)
p.Close()
}

消费者实践

消费者参数与调优

消费者参数


package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// 配置Kafka Consumer
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset":"earliest",
"fetch.min.bytes":1, //最小拉取字节数
"fetch.max.bytes":52428800,//最大拉取字节数
"fetch.wait.max.ms":"500", //如果没有最新消费消息默认等待500ms
"enable.auto.commit":true, //是否支持自动提交位点,默认true
"auto.commit.interval.ms":5000,//自动提交位点间隔,默认5s
"max.poll.interval.ms": 300000,//Consumer 在两次 poll 操作之间的最大延迟。默认5分钟
"session.timeout.ms": 45000,//session时间,默认45s
"heartbeat.interval.ms": 3000,//心跳时间,默认3s
"socket.connection.setup.timeout.ms": 5000, // socket.connection.setup.timeout.ms 控制连接初始化超时时间,单位为毫秒,librdkafka 1.9以上版本可配置。
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}

// 订阅主题
c.SubscribeTopics([]string{"test-topic"}, nil)

// 手动提交位点
for {
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
c.CommitMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}

// 关闭Kafka Consumer
c.Close()
}

参数说明与调优

1. max.poll.interval.ms 是 Kafka Consumer 的一个配置参数,它用于指定 Consumer 在两次 poll 操作之间的最大延迟。这个参数的主要作用是控制 Consumer 的 liveness,也就是判断 Consumer 是否还活着。如果 Consumer 在 max.poll.interval.ms 指定的时间内没有进行 poll 操作,那么 Kafka 认为这个 Consumer 已经挂掉,会触发 Consumer 的 rebalance 操作。这个参数的设置需要根据实际的消费速度来调整。如果设置得太小,可能会导致 Consumer 频繁地触发 rebalance 操作,增加了 Kafka 的负担;如果设置得太大,可能会导致 Consumer 在出现问题时不能及时被 Kafka 检测到,从而影响了消息的消费。
2. 一般消费主要是 rebalance 时间频繁和消费线程阻塞问题,参考以下说明参数优化:
2.1 session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
2.2 heartbeat.interval.ms:默认3s,设置该值 需要小于session.timeout.ms/3。
2.3 max.poll.interval.ms:默认5分钟,如果分区数和消费者较多,建议适当调大该值。该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。
注意:
如果消息处理是同步处理,即拉取消息、处理、再拉取下一个消息,需要做如下改造:
根据需求调大 max.poll.interval.ms 时间。
针对处理时间大于 max.poll.interval.ms 请求处理时间进行监控,采样打印超时时间。
3. 针对自动提交位点请求,建议 auto.commit.interval.ms 时间不要低于1000ms,因为频率过高的位点请求会导致 Broker CPU 很高,影响其他正常服务的读写。

创建消费者实例

Confluent Go 提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

自动提交位点

自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。

package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// 配置Kafka Consumer
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": true, //是否启用自动提交位点。设置为true,表示启用自动提交位点。
"auto.commit.interval.ms": 5000, //自动提交位点的间隔时间。设置为5000毫秒(即5秒),表示每5秒自动提交一次位点。
"max.poll.interval.ms": 300000,//Consumer在一次poll操作中最长的等待时间。设置为300000毫秒(即5分钟),表示Consumer在一次poll操作中最多等待5分钟
"session.timeout.ms": 10000,//指定Consumer与broker之间的会话超时时间,设置10秒
"heartbeat.interval.ms": 3000, //指定Consumer发送心跳消息的间隔时间。设置为3000毫秒(即3秒)
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}

// 订阅主题
c.SubscribeTopics([]string{"test-topic"}, nil)

// 自动提交位点
for {
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}

// 关闭Kafka Consumer
c.Close()

手动提交位点

手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。

package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// 配置Kafka Consumer
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 10000,
"heartbeat.interval.ms": 3000,
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}

// 订阅主题
c.SubscribeTopics([]string{"test-topic"}, nil)

// 手动提交位点
for {
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
c.CommitMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}

// 关闭Kafka Consumer
c.Close()



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈