Message contents does not match its CRC。err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.compression=none 关闭压缩即可。target: kafka://ip1:port1?compression=nonepartitioner,可选 random(默认),roundrobin,hash(按 key 分区)。target: kafka://ip1:port1?clientid=xxx&partitioner=hashasync=1target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1func init() {// 重写默认的异步生产写数据错误回调kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {// do something if async producer occurred error.}// 重写默认的异步生产写数据成功回调kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {// do something if async producer succeed.}}
client has run out of available brokers to talk to。kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0 The provider group protocol type is incompatible with the other members。kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members. strategy,可选:sticky(默认),range,roundrobin。address: ip1:port1?topics=topic12&group=my-group&strategy=rangetrpc_go.yaml中配置 fake_address,然后配合 kafka.RegisterAddrConfig 方法注入,trpc_go.yaml配置如下:address: fake_address
func main() {s := trpc.NewServer()// 使用自定义 addr,需在启动 server 前注入cfg := kafka.GetDefaultConfig()cfg.Brokers = []string{"127.0.0.1:9092"}cfg.Topics = []string{"test_topic"}kafka.RegisterAddrConfig("fake_address", cfg)kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})s.Serve()}
通过kafka.GetRawSaramaContext可以获取底层 saramaConsumerGroupSession和ConsumerGroupClaim。但是此处暴露这两个接口只是方便用户做监控日志,应该只使用其读方法,调用任何写方法在这里都是未定义行为,可能造成未知结果。// RawSaramaContext 存放 sarama ConsumerGroupSession 和 ConsumerGroupClaim// 导出此结构体是为了方便用户实现监控,提供的内容仅用于读,调用任何写方法属于未定义行为type RawSaramaContext struct {Session sarama.ConsumerGroupSessionClaim sarama.ConsumerGroupClaim}
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())}// ...return nil}
文档反馈