tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

tRpc Go SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:59:40

背景

TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
本文着重介绍 tRpc-Go-Kafka 客户端的关键参数和实践教程,以及常见问题。

调优实践

tRPC-GO-Kafka 封装开源 Kafka SDK,利用 tRPC-Go 拦截器等功能,接入 tRPC-Go 生态。因此实践教程参见 Sarama Go

常见问题

生产者问题

1. 使用 CKafka 生产消息时,出现错误 Message contents does not match its CRC
err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
插件默认启用了 gzip 压缩,在 target 上加上参数 compression=none 关闭压缩即可。
target: kafka://ip1:port1?compression=none
2. 生产时同一用户需要有序,如何配置?
客户端增加参数 partitioner,可选 random(默认),roundrobin,hash(按 key 分区)。
target: kafka://ip1:port1?clientid=xxx&partitioner=hash
3. 如何异步生产?
客户端增加参数 async=1
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
4. 如何使用异步生产写数据回调?
需要在代码中重写异步生产写数据的成功/失败的回调函数,例如:
import (
"git.code.oa.com/vicenteli/trpc-database/kafka"
)

func init() {
// 重写默认的异步生产写数据错误回调
kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
// do something if async producer occurred error.
}

// 重写默认的异步生产写数据成功回调
kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
// do something if async producer succeed.
}
}

消费者问题

1. 如果消费时 Handle 返回了非 nil 会发生什么?
会休眠 3s 后重新消费,不建议这么做,因为返回错误会导致无限重试消费,失败的应该由业务做重试逻辑。
2. 使用 ckafka 消费消息时,出现错误 client has run out of available brokers to talk to
kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
优先检查 brokers 是否可达,然后检查支持的 kafka 客户端版本,尝试在配置文件 address 中加上参数例如 version=0.10.2.0
address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
3. 当多个生产者生产时,部分生产者建立连接失败会影响正常的生产者超时?
请更新至 v0.2.18。 低版本插件在创建生产者时先加锁,再建立连接,建立连接结束后释放锁。如果存在一部分异常生产者建立连接耗时很长,就会导致其他正常生产请求在获取生产者的时候加锁失败,最终超时。此行为在 v0.2.18 已经修复。
4. 消费消息时,提示 The provider group protocol type is incompatible with the other members
kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
同一消费者组的客户端重分组策略不一样,可修改参数 strategy,可选:sticky(默认),range,roundrobin。
address: ip1:port1?topics=topic12&group=my-group&strategy=range
5. 如何注入自定义配置(远端配置)?
如果需要代码中指定配置,先在trpc_go.yaml中配置 fake_address,然后配合 kafka.RegisterAddrConfig 方法注入,trpc_go.yaml配置如下:
address: fake_address
在服务启动前,注入自定义配置:
func main() {
s := trpc.NewServer()
// 使用自定义 addr,需在启动 server 前注入
cfg := kafka.GetDefaultConfig()
cfg.Brokers = []string{"127.0.0.1:9092"}
cfg.Topics = []string{"test_topic"}
kafka.RegisterAddrConfig("fake_address", cfg)
kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})

s.Serve()
}
6. 如何获取底层 sarama 的上下文信息?
通过 kafka.GetRawSaramaContext 可以获取底层 sarama ConsumerGroupSessionConsumerGroupClaim。但是此处暴露这两个接口只是方便用户做监控日志,应该只使用其读方法,调用任何写方法在这里都是未定义行为,可能造成未知结果。
// RawSaramaContext 存放 sarama ConsumerGroupSession 和 ConsumerGroupClaim
// 导出此结构体是为了方便用户实现监控,提供的内容仅用于读,调用任何写方法属于未定义行为
type RawSaramaContext struct {
Session sarama.ConsumerGroupSession
Claim sarama.ConsumerGroupClaim
}
使用实例:
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
}
// ...
return nil
}



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈