tencent cloud

TDMQ for CKafka

Release Notes and Announcements
Release Notes
Broker Release Notes
Announcement
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for CKafka
Strengths
Scenarios
Technology Architecture
Product Series Introduction
Apache Kafka Version Support Description
Comparison with Apache Kafka
High Availability
Use Limits
Regions and AZs
Related Cloud Services
Billing
Billing Overview
Pricing
Billing Example
Changing from Postpaid by Hour to Monthly Subscription
Renewal
Viewing Consumption Details
Overdue Payments
Refund
Getting Started
Guide for Getting Started
Preparations
VPC Network Access
Public Domain Name Access
User Guide
Usage Process Guide
Configuring Account Permission
Creating Instance
Configuring Topic
Connecting Instance
Managing Messages
Managing Consumer Group
Managing Instance
Changing Instance Specification
Configuring Traffic Throttling
Configuring Elastic Scaling Policy
Configuring Advanced Features
Viewing Monitoring Data and Configuring Alarm Rules
Synchronizing Data Using CKafka Connector
Use Cases
Cluster Resource Assessment
Client Practical Tutorial
Log Integration
Open-Source Ecosystem Integration
Replacing Supporting Route (Old)
Migration Guide
Migration Solution Overview
Migrating Cluster Using Open-Source Tool
Troubleshooting
Topics
Clients
Messages
​​API Reference
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK Reference
SDK Overview
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
SDK for Connector
Security and Compliance
Permission Management
Network Security
Deletion Protection
Event Record
CloudAudit
FAQs
Instances
Topics
Consumer Groups
Client-Related
Network-Related
Monitoring
Messages
Agreements
CKafka Service Level Agreements
Contact Us
Glossary

tRpc Go SDK

PDF
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-01-20 17:10:14

Background

TDMQ CKafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It provides features such as high throughput, low latency, scalability, and fault tolerance.
This article focuses on introducing the key parameters and practical tutorials of the tRpc-Go-Kafka client, as well as common issues.

Tuning Practices

tRPC-GO-Kafka encapsulates the open-source Kafka SDK, leveraging features such as tRPC-Go interceptors to integrate into the tRPC-Go ecosystem. Thus, for practical tutorials, refer to Sarama Go.

FAQs

Producer Issues

1. When using CKafka to produce messages, an error occursMessage contents does not match its CRC.
err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
The plugin has gzip compression enabled by default. Add the parameter compression=none on the target to disable compression.
target: kafka://ip1:port1?compression=none
2. During message production, if messages from the same user need to be ordered, how this can be configured.
The client adds the parameter partitioner, with options available: random (default), roundrobin, hash (partitioned by key).
target: kafka://ip1:port1?clientid=xxx&partitioner=hash
3. How to implement asynchronous message production?
Add the parameter async=1 to the client
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
4. How to implement callback for asynchronous message production?
You need to rewrite the success/failure callback functions for asynchronous message production in the code, for example:
import (
"git.code.oa.com/vicenteli/trpc-database/kafka"
)

func init() {
// Override the default error callback for asynchronous message production
kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
// do something if async producer occurred error.
}

// Override the default success callback for asynchronous message production
kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
// do something if async producer succeed.
}
}

Consumer Issues

1. What happens if a non-nil value is returned by Handle during message consumption?
It will sleep for 3 seconds before re-consuming. This is not recommended because returning errors will lead to infinite retry attempts. Failed operations should be handled by implementing retry logic in the business layer.
2. When consuming messages with ckafka, an error occursclient has run out of available brokers to talk to.
kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
First, check whether the brokers are reachable, then verify the supported kafka client version, and try adding parameters such as version=0.10.2.0 in the configuration file address.
address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
3. Will the failure of some producers to establish connections affect the timeout mechanism of healthy producers during concurrent message production?
Please update to v0.2.18. In lower versions, the plugin first acquires a lock when creating a producer, then establishes a connection, and releases the lock after the connection is established. If some abnormal producers take a long time to establish connections, it will cause other normal production requests to fail to acquire the lock when obtaining a producer, eventually leading to timeouts. This behavior has been fixed in v0.2.18.
4. When consuming messages, the system displays the errorThe provider group protocol type is incompatible with the other members.
kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
The rebalance strategies of clients in the same consumer group are inconsistent. You can modify the parameter strategy, with options: sticky (default), range, roundrobin.
address: ip1:port1?topics=topic12&group=my-group&strategy=range
5. How to inject custom configurations (remote configurations)?
If you need to specify configurations in the code, first configure fake_address in trpc_go.yaml, then inject it in conjunction with the kafka.RegisterAddrConfig method. The trpc_go.yaml configuration is as follows:
address: fake_address
Before the service starts, inject custom configurations:
func main() {
s := trpc.NewServer()
// To use a custom addr, it must be injected before starting the server
cfg := kafka.GetDefaultConfig()
cfg.Brokers = []string{"127.0.0.1:9092"}
cfg.Topics = []string{"test_topic"}
kafka.RegisterAddrConfig("fake_address", cfg)
kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})

s.Serve()
}
6. How to obtain the underlying sarama context information?
Through kafka.GetRawSaramaContext, you can obtain the underlying sarama ConsumerGroupSession and ConsumerGroupClaim. However, exposing these two interfaces here is solely for facilitating user monitoring and logging. Only their read methods should be used, as calling any write methods constitutes undefined behavior that may lead to unknown results.
// RawSaramaContext holds the sarama ConsumerGroupSession and ConsumerGroupClaim
// Exporting this structure is for the convenience of users to implement monitoring. The provided content is for read-only purposes. Calling any write methods constitutes undefined behavior.
type RawSaramaContext struct {
Session sarama.ConsumerGroupSession
Claim sarama.ConsumerGroupClaim
}
Usage example:
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
}
// ...
return nil
}



도움말 및 지원

문제 해결에 도움이 되었나요?

피드백