tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

Sarama Go

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:59:40

背景

TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它具备高吞吐量、低延迟、可伸缩性和容错性等特性。
Sarama:Shopify 开发的一个 Kafka 库,提供了生产者、消费者、分区消费者等功能。该库的性能较好,社区支持也较为活跃。
Confluent-Kafka-Go:由 Confluent 开发的 Kafka 库,提供了高级 API,易于使用。该库基于 librdkafka C 库,性能非常优秀,但安装和使用略显复杂。
本文着重介绍上述 Sarama Go 客户端的关键参数、最佳实践以及常见问题。

生产者实践

版本选择

在选择 Sarama 客户端版本时,需要确保所选版本与 Kafka broker 版本兼容。Sarama 库支持多个 Kafka 协议版本,可以通过设置 config.Version 来指定使用的协议版本。常见的 Kafka 协议版本与 Sarama 库版本的对应关系如下,目前最新版本请参见 Sarama 版本
Kafka 版本
Sarama 库版本
Sarama 协议版本常量
0.8.2.x
>= 1.0.0
sarama.V0_8_2_0
0.9.0.x
>= 1.0.0
sarama.V0_9_0_0
0.10.0.x
>= 1.0.0
sarama.V0_10_0_0
0.10.1.x
>= 1.0.0
sarama.V0_10_1_0
0.10.2.x
>= 1.0.0
sarama.V0_10_2_0
0.11.0.x
>= 1.16.0
sarama.V0_11_0_0
1.0.x
>= 1.16.0
sarama.V1_0_0_0
1.1.x
>= 1.19.0
sarama.V1_1_0_0
2.0.x
>= 1.19.0
sarama.V2_0_0_0
2.1.x
>= 1.21.0
sarama.V2_1_0_0
2.2.x
>= 1.23.0
sarama.V2_2_0_0
2.3.x
>= 1.24.0
sarama.V2_3_0_0
2.4.x
>= 1.27.0
sarama.V2_4_0_0
2.5.x
>= 1.28.0
sarama.V2_5_0_0
2.6.x
>= 1.29.0
sarama.V2_6_0_0
2.7.x
>= 1.29.0
sarama.V2_7_0_0
2.8.x以及以上
建议使用>=1.42.1
sarama.V2_8_0_0-sarama.V3_6_0_0
上述列出的 Sarama 库版本是支持对应 Kafka 协议版本的最低版本。为了获得最佳性能和使用新功能,建议使用 Sarama 的最新版本。在使用最新版本时,客户可以通过设置 config.Version 来指定与您的 Kafka broker 兼容的协议版本。设置方式如下,务必先设置版本后使用,否则会有预期外的不兼容问题:
config := sarama.NewConfig()
config.Version = sarama.V2_7_0_0 // 根据实际Kafka版本设置协议版本

生产者参数与调优

生产者参数

在使用 Sarama Go 客户端写入 kafka 时候,需要配置如下关键参数,相关的参数和默认值如下:

config := sarama.NewConfig()
sarama.MaxRequestSize = 100 * 1024 * 1024 //请求最大大小,默认100MB,可以调整,写入大于100MB的消息会直接报错
sarama.MaxResponseSize = 100 * 1024 * 1024 //响应最大大小,默认100MB,可以调整,获取大于100MB的消息会直接报错

config.Producer.RequiredAcks = sarama.WaitForLocal // 默认值为sarama.WaitForLocal(1)

config.Producer.Retry.Max = 3 // 生产者重试的最大次数,默认为3
config.Producer.Retry.Backoff = 100 * time.Millisecond // 生产者重试之间的等待时间,默认为100毫秒

config.Producer.Return.Successes = false //是否返回成功的消息,默认为false
config.Producer.Return.Errors = true // 返回失败的消息,默认值为true

config.Producer.Compression = CompressionNone //对消息是否压缩后发送,默认CompressionNone不压缩
config.Producer.CompressionLevel = CompressionLevelDefault // 指定压缩等级,在配置了压缩算法后生效

config.Producer.Flush.Frequency = 0 //producer缓存消息的时间, 默认缓存0毫秒
config.Producer.Flush.Bytes = 0 // 达到多少字节时,触发一次broker请求,默认为0,直接发送,存在天然上限值MaxRequestSize,因此默认最大100MB
config.Producer.Flush.Messages = 0 // 达到多少条消息时,强制,触发一次broker请求,这个是上限值
config.Producer.Flush.MaxMessages = 0 // 最大缓存多少消息,默认为0,有消息立刻发送,MaxMessages设置大于0时,必须设置 Messages, 且需要保证:MaxMessages ≥ Messages

config.Producer.Timeout = 5 * time.Second // 超时时间

config.Producer.Idempotent = false //是否需要幂等,默认false
config.Producer.Transaction.Timeout = 1 * time.Minute // 事务超时时间默认1分钟
config.Producer.Transaction.Retry.Max = 50 //事务重试时间
config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
config.Net.MaxOpenRequests = 5 //默认值5,一次发送请求的数量
config.Net.DialTimeout = 5 * time.Second // 控制连接初始化超时时间
config.Producer.Transaction.ID = "test" //事务ID

config.ClientID = "your-client-id" // 客户端ID

参数说明调优

关于 RequiredAcks 参数优化

RequiredAcks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为 WaitForLocal,表示消息发送给 Leader Broker 后,Leader 确认消息写入后即返回。RequiredAcks 参数还有以下可选值:
NoResponse: 不等待任何确认,直接返回。
WaitForLocal: 等待 Leader 副本确认写入后返回。
WaitForAll: 等待 Leader 副本以及相关的 Follower 副本确认写入后返回。
由上可知,在跨可用区场景,以及副本数较多的 Topic,RequiredAcks 参数的取值会影响消息的可靠性和吞吐量。因此:
在一些在线业务消息的场景下,吞吐量要求不大,可以将 RequiredAcks 参数设置为 WaitForAll,则可以确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性。
在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 RequiredAcks 设置为 WaitForLocal,提高吞吐。

关于 Flush 参数优化(缓存)

默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。在高吞吐场景下,可以配合计算和设置:
假设目标 Broker 吞吐量 ​16 MB/s,平均消息大小 ​1KB,容忍延迟 ​500ms​:
1. 计算批次大小​:
理论批次 = 16MB/s × 0.5s = 8MB
实际批次 = min(理论批次, MaxMessageBytes)
假设 MaxMessageBytes = 4MB
则推荐实际批次 Bytes = 4MB
2. 计算 Frequency​:
批次填充时间 = 4MB/16MB/s = 0.25s
推荐 Frequency = 0.25s × 1.2 ≈ 300ms //其中 1.2 为放大系数,主要用来避免空批次
3. Messages 设置​:
配套 Messages = (4MB/1KB) × 1.5 ≈ 6000 //其中 1.5 为放大系数,主要用来防止消息过小
4. MaxMessages 设置:
Messages 意味着生产者实例的本地内存缓冲区中允许累积的最大消息数量,MaxMessages 则意味着每次刷新操作(Flush)最多发送多少条消息,因此MaxMessages 必须 ≥ Messages,从而确保缓冲区中的消息全部被发送出去
// 高吞吐场景标准配置(吞吐≥16MB/s时)
config.Producer.Flush.Bytes = 4 * 1024 * 1024 // 4MB
config.Producer.Flush.Frequency = 300 * time.Millisecond
config.Producer.Flush.Messages = 6000 // 防小消息积压,可以设置更大值,如 10000
config.Producer.Flush.MaxMessages = 6000 // 参考 Messages 参数,可以设置为更大值,如 10000,需要确保 MaxMessages ≥ Messages

关于事务参数优化


config.Producer.Idempotent = true //是否需要幂等,在事务场景下需要设置为true
config.Producer.Transaction.Timeout = 1 * time.Minute // 事务超时时间默认1分钟
config.Producer.Transaction.Retry.Max = 50 //事务重试时间
config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
config.Net.MaxOpenRequests = 5 //默认值5,一次发送请求的数量
config.Producer.Transaction.ID = "test" //事务ID

需要强调,事务因为要保障消息的 exactly once 语义,因此会额外付出更多的计算资源,所以 config.Net.MaxOpenRequests 的选取必须小于等于5,Broker 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个 batch 数据,如果客户在事务的基础上还需要保持一定的吞吐,因此可以设置该值为5,同时适当增加事务超时时间,容忍高负载下一些网络抖动带来的时延问题。

关于压缩参数优化

Sarama Go 支持如下压缩参数:
config.Producer.Compression = CompressionNone //对消息是否压缩后发送,默认CompressionNone不压缩
config.Producer.CompressionLevel = CompressionLevelDefault //指定压缩等级,在配置了压缩算法后生效
在Sarama Kafka Go客户端中,支持以下几种压缩配置:
1. sarama.CompressionNone:不使用压缩。
2. sarama.CompressionGZIP:使用 GZIP 压缩。
3. sarama.CompressionSnappy:使用 Snappy 压缩。
4. sarama.CompressionLZ4:使用 LZ4 压缩。
5. sarama.CompressionZSTD:使用 ZSTD 压缩。
要在 Sarama Kafka Go 客户端中使用压缩消息,需要在创建生产者时设置 config.Producer.Compression 参数。例如,要使用 LZ4 压缩算法,可以将config.Producer.Compression 设置为 sarama.CompressionLZ4 ,虽然压缩消息的压缩和解压缩,发生在客户端,是一种用计算换带宽的优化方式,但是由于Broker 针对压缩消息存在校验行为会付出额外的计算成本,尤其是 GZIP 压缩,Broker 对其校验计算成本会比较大,在某种程度上可能会出现得不偿失的情况,反而因为计算的增加导致Broker消息处理能力偏低,导致带宽吞吐更低。在低吞吐或者低规格服务下,不建议使用压缩消息。如果还是需要压缩消息,这种情况建议可以使用如下方式进行使用:
1. 在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
{"Compression","CompressionLZ4"}
2. 在Producer端将messageCompression当成正常消息发送。
3. 在 Consumer 端读取消息key,获取使用的压缩方式,独立进行解压缩。

关于压缩参数内存优化

客户使用 Go 语言 Sarama 客户端并采用 LZ4 压缩算法压缩消息,LZ4 压缩消息在服务端需要解压缩进行校验,Sarama 客户端的 LZ4 压缩默认参数配置相较于官方 Java 客户端会申请更多内存,导致内存消耗过快。由于内存申请时间增长,Kafka 在处理生产请求时会导致大量处理线程在等待同分区锁的释放,容易引发请求队列持续打满,造成生产耗时增高,CPU 利用率增高,影响 Kafka 集群的生产消费。因此建议客户优化如下配置:
Go Sarama 客户端:LZ4 消息解压内存默认申请值为 4 MB,请求数翻倍时对共享集群冲击较大,因此建议客户配置为:64 KB。Java 客户端:LZ4 消息解压内存默认申请值 64 KB。
示例如下:
lz4.DefaultBlockSizeOption = lz4.BlockSizeOption(lz4.Block64Kb)

创建生产者实例

如果应用程序需要更高的吞吐量,则可以使用异步生产者,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。如果应用程序需要更高的可靠性,则可以使用同步生产者,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。

同步生产者

在 Sarama Kafka Go 客户端中,有两种类型的生产者:同步生产者和异步生产者。它们的主要区别在于发送消息的方式和处理消息结果的方式。同步生产者:同步生产者在发送消息时会阻塞当前线程,直到消息发送完成并收到服务器的确认。因此,同步生产者的吞吐量较低,但是可以立即知道消息是否发送成功。示例如下:
package main

import (
"fmt"
"log"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Return.Errors = true

brokers := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("Hello, World!"),
}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Failed to send message: %v", err)
} else {
fmt.Printf("Message sent to partition %d at offset %d\\n", partition, offset)
}
}

异步生产者

异步生产者:异步生产者在发送消息时不会阻塞当前线程,而是将消息放入一个内部的发送队列,然后立即返回。因此,异步生产者的吞吐量较高,但是需要通过回调函数来处理消息结果。
package main

import (
"fmt"
"log"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Return.Errors = true

brokers := []string{"localhost:9092"}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("Hello, World!"),
}

producer.Input() <- msg

select {
case success := <-producer.Successes():
fmt.Printf("Message sent to partition %d at offset %d\\n", success.Partition, success.Offset)
case err := <-producer.Errors():
log.Printf("Failed to send message: %v", err)
}
}

消费者实践

版本选择

在选择 Sarama 客户端版本时,需要确保所选版本与 Kafka broker 版本兼容。Sarama 库支持多个 Kafka 协议版本,可以通过设置 config.Version 来指定使用的协议版本。
config := sarama.NewConfig()
config.Version = sarama.V2_8_2_0

消费者参数与调优


config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange //消费者分配分区的默认方式
config.Consumer.Offsets.Initial = sarama.OffsetNewest //在没有提交位点情况下,使用最新的位点还是最老的位点,默认是最新的消息位点
config.Consumer.Offsets.AutoCommit.Enable = true //是否支持自动提交位点,默认支持
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second //自动提交位点时间间隔,默认1s
config.Consumer.MaxWaitTime = 250 * time.Millisecond //在没有最新消费消息时候,客户端等待的时间,默认250ms
config.Consumer.MaxProcessingTime = 100 * time.Millisecond
config.Consumer.Fetch.Min = 1 //消费请求中获取的最小消息字节数,Broker将等待至少这么多字节的消息然后返回。默认值为1,不能设置0,因为0会导致在没有消息可用时消费者空转。
config.Consumer.Fetch.Max = 0 //消费请求最大的字节数。默认为0,表示不限制
config.Consumer.Fetch.Default = 1024 * 1024 //消费请求的默认消息字节数(默认为1MB),需要大于实例的大部分消息,否则Broker会花费大量时间计算消费数据是否达到这个值的条件
config.Consumer.Return.Errors = true

config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // 设置消费者组在进行rebalance时所使用的策略为NewBalanceStrategyRange,默认NewBalanceStrategyRange
config.Consumer.Group.Rebalance.Timeout = 60 * time.Second // 设置rebalance操作的超时时间,默认60s
config.Consumer.Group.Session.Timeout = 10 * time.Second // 设置消费者组会话的超时时间为,默认为10s
config.Consumer.Group.Heartbeat.Interval = 3 * time.Second // 心跳超时时间,默认为3s
config.Consumer.MaxProcessingTime = 100 * time.Millisecond //消息处理的超时时间,默认100ms

config.Net.DialTimeout = 5 * time.Second // 控制连接初始化超时时间

参数说明与调优

一般消费主要是rebalance时间频繁和消费线程阻塞问题,参考以下说明参数优化:
config.Consumer.Group.Session.Timeout:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s 即可。
config.Consumer.Group.Heartbeat.Interval:默认3s,设置该值 需要小于Consumer.Group.Session.Timeout/3。
config.Consumer.Group.Rebalance.Timeout:默认60s,如果分区数和消费者较多,建议适当调大该值。
config.Consumer.MaxProcessingTime:该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。
注意:
根据需求调大 MaxProcessingTime 时间。
针对处理时间大于 MaxProcessingTime 请求处理时间进行监控,采样打印超时时间。

创建消费者实例

Sarama 提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

自动提交位点

自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"time"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

brokers := []string{"localhost:9092"}
topic := "test-topic"

client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
if err != nil {
log.Fatalf("unable to create kafka consumer group: %v", err)
}
defer client.Close()

ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

for {
err := client.Consume(ctx, []string{topic}, &consumerHandler{})
if err != nil {
log.Printf("consume error: %v", err)
}

select {
case <-signals:
cancel()
return
default:
}
}
}()

wg.Wait()
}

type consumerHandler struct{}

func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}

手动提交位点

手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false

brokers := []string{"localhost:9092"}
topic := "test-topic"

client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
if err != nil {
log.Fatalf("unable to create kafka consumer group: %v", err)
}
defer client.Close()

ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

for {
err := client.Consume(ctx, []string{topic}, &consumerHandler{})
if err != nil {
log.Printf("consume error: %v", err)
}

select {
case <-signals:
cancel()
return
default:
}
}
}()

wg.Wait()
}

type consumerHandler struct{}

func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
sess.Commit()
}
return nil
}

Sarama Go 生产消费常见问题

1. 配置了手动提交位点,但是位点在控制台查询消费组时候没有更新。
无论配置了手动提交位点还是自动提交位点,都需要先进行标记,sess.MarkMessage(msg, ""),表示该消息已经被消费完,然后才能提交位点。
2. Sarama Go 作为消费者的一些问题,Sarama Go 版本客户端存在以下已知问题:
2.1 当Topic新增分区时,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。
2.2 当Sarama Go客户端同时订阅两个以上的Topic时,有可能会导致部分分区无法正常消费消息。
2.3 当Sarama Go客户端的消费位点重置策略设置为Oldest(earliest)时,如果客户端宕机或服务端版本升级,由于Sarama Go客户端自行实现OutOfRange机制,有可能会导致客户端从最小位点开始重新消费所有消息。
2.4 对于该问题:Confluent Go客户端的Demo地址,请访问 kafka-confluent-go-demo
3. 出现报错:Failed to produce message to topic。
原因可能为版本没有对齐,此时客户先确定kafka Broker的版本,然后指定版本:
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈