Release Notes
Broker Release Notes
Announcement
Message contents does not match its CRC.err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.compression=none on the target to disable compression.target: kafka://ip1:port1?compression=nonepartitioner, with options available: random (default), roundrobin, hash (partitioned by key).target: kafka://ip1:port1?clientid=xxx&partitioner=hashasync=1 to the clienttarget: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1import ("git.code.oa.com/vicenteli/trpc-database/kafka")func init() {// Override the default error callback for asynchronous message productionkafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {// do something if async producer occurred error.}// Override the default success callback for asynchronous message productionkafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {// do something if async producer succeed.}}
client has run out of available brokers to talk to.kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0 The provider group protocol type is incompatible with the other members.kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.strategy, with options: sticky (default), range, roundrobin.address: ip1:port1?topics=topic12&group=my-group&strategy=rangefake_address in trpc_go.yaml, then inject it in conjunction with the kafka.RegisterAddrConfig method. The trpc_go.yaml configuration is as follows:address: fake_address
func main() {s := trpc.NewServer()// To use a custom addr, it must be injected before starting the servercfg := kafka.GetDefaultConfig()cfg.Brokers = []string{"127.0.0.1:9092"}cfg.Topics = []string{"test_topic"}kafka.RegisterAddrConfig("fake_address", cfg)kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})s.Serve()}
Throughkafka.GetRawSaramaContext, you can obtain the underlying saramaConsumerGroupSessionandConsumerGroupClaim. However, exposing these two interfaces here is solely for facilitating user monitoring and logging. Only their read methods should be used, as calling any write methods constitutes undefined behavior that may lead to unknown results.// RawSaramaContext holds the sarama ConsumerGroupSession and ConsumerGroupClaim// Exporting this structure is for the convenience of users to implement monitoring. The provided content is for read-only purposes. Calling any write methods constitutes undefined behavior.type RawSaramaContext struct {Session sarama.ConsumerGroupSessionClaim sarama.ConsumerGroupClaim}
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())}// ...return nil}
피드백