tencent cloud

TDMQ for CKafka

Release Notes and Announcements
Release Notes
Broker Release Notes
Announcement
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for CKafka
Strengths
Scenarios
Technology Architecture
Product Series Introduction
Apache Kafka Version Support Description
Comparison with Apache Kafka
High Availability
Use Limits
Regions and AZs
Related Cloud Services
Billing
Billing Overview
Pricing
Billing Example
Changing from Postpaid by Hour to Monthly Subscription
Renewal
Viewing Consumption Details
Overdue Payments
Refund
Getting Started
Guide for Getting Started
Preparations
VPC Network Access
Public Domain Name Access
User Guide
Usage Process Guide
Configuring Account Permission
Creating Instance
Configuring Topic
Connecting Instance
Managing Messages
Managing Consumer Group
Managing Instance
Changing Instance Specification
Configuring Traffic Throttling
Configuring Elastic Scaling Policy
Configuring Advanced Features
Viewing Monitoring Data and Configuring Alarm Rules
Synchronizing Data Using CKafka Connector
Use Cases
Cluster Resource Assessment
Client Practical Tutorial
Log Integration
Open-Source Ecosystem Integration
Replacing Supporting Route (Old)
Migration Guide
Migration Solution Overview
Migrating Cluster Using Open-Source Tool
Troubleshooting
Topics
Clients
Messages
​​API Reference
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK Reference
SDK Overview
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
SDK for Connector
Security and Compliance
Permission Management
Network Security
Deletion Protection
Event Record
CloudAudit
FAQs
Instances
Topics
Consumer Groups
Client-Related
Network-Related
Monitoring
Messages
Agreements
CKafka Service Level Agreements
Contact Us
Glossary

Confluent Go SDK

PDF
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-01-20 17:10:13

Background

TDMQ CKafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It provides features such as high throughput, low latency, scalability, and fault tolerance.
Sarama: A Kafka library developed by Shopify, providing functionalities such as producers, consumers, and partition consumers. This library offers excellent performance and has active community support.
Confluent-Kafka-Go: A Kafka library developed by Confluent, providing high-level APIs that are easy to use. This library is based on the librdkafka C library, offering excellent performance, but its installation and usage are slightly more complex.
This article focuses on introducing the key parameters, practical tutorials, and common issues of the aforementioned Confluent Go client.

Producer Practices

Version Selection

When using the Confluent Go SDK, you can specify the address of the Kafka cluster by configuring the "bootstrap.servers" parameter, while the Broker version can be set via the "api.version.request" parameter. This enables the Confluent Go SDK to automatically detect the Broker version upon startup.
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost",
"api.version.request": true,
}

Producer Parameters and Tuning

Producer Parameters

Confluent Go is developed based on librdkafka. When using the Confluent Go client to write to Kafka, the parameters that need to be configured are passed through to librdkafka, mainly involving the following key parameters. The relevant parameters and their default values are as follows:

package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "-1", // acks mode, default value is -1
"client.id": "rdkafka", // Client ID
"compression.type": "none", // Specifies the compression method
"compression.level": "-1", // Compression level
"batch.num.messages": "10000", // By default, a batch aggregates up to 10,000 messages to form a MessageSet for batch sending, improving performance.
"batch.size": "1000000", // The batch size limitation for a MessageSet, default limited to a maximum of 1000000 bytes.
"queue.buffering.max.ms": "5", // Delays batching messages for 5 ms by default before constructing MessageSets to be transmitted to the Broker.
"queue.buffering.max.messages": "100000", // The total number of messages in Producer batch sending cannot exceed 100000
"queue.buffering.max.kbytes": "1048576", // In Producer batch sending, MessageSets
"message.send.max.retries": "2147483647", // Retry count, default 2147483647
"retry.backoff.ms": "100", // Retry interval, default 100ms
"socket.timeout.ms": "60000", // Session timeout duration, default is 60s
"max.in.flight.requests.per.connection": "1000000", // The maximum number of requests that can be sent per connection is 1000000
"max.in.flight": "1000000" // For a single connection, this value is an alias for max.in.flight.requests.per.connection, allowing up to 1000000 requests to be sent
}

producer, err := kafka.NewProducer(config)
if err != nil {
panic(fmt.Sprintf("Failed to create producer: %s", err))
}

// Use the producer to send messages and perform other operations...

// Close the producer
producer.Close()
}

Parameter Description Tuning

On the Optimization of the max.in.flight.requests.per.connection Parameter
Confluent Go is developed based on librdkafka. In librdkafka, max.in.flight.requests.per.connection is defined as the number of requests that can be sent concurrently on a single connection, with a default value of 1000000. The max.in.flight parameter is an alias for max.in.flight.requests.per.connection, both representing the same meaning. In the standard Java SDK, this parameter is defined as max.in.flight.requests.per.connection with a default value of 5. Setting this value too high may cause server pressure and lead to stability issues. Therefore, it is recommended to align the SDK value of Confluent Go with that of the open-source Java SDK, setting the default value to 5.
max.in.flight.requests.per.connection:5
max.in.flight:5
On acks Parameter Optimization
The acks parameter controls the acknowledgment mechanism for message delivery by producers. Its default value is -1, meaning the producer only returns after the message is sent to the Leader Broker and both the Leader acknowledgment and corresponding Follower messages are fully written. The acks parameter also supports the following optional values: 0, 1, -1. In cross-AZ scenarios and for topics with a high number of replicas, the value of the acks parameter affects message reliability and throughput.
In scenarios involving online business messages where throughput requirements are not high, you can set the acks parameter to -1 to ensure that messages are returned only after being received and acknowledged by all replicas, thereby enhancing message reliability.
In scenarios such as log collection, big data, or offline computing where high throughput (i.e., the amount of data written to Kafka per second) is required, setting acks to 1 can maintain performance while also improving throughput.
About buffering Parameter Optimization (Caching)
By default, when transmitting the same amount of data, using a single request for network transmission instead of multiple requests can effectively reduce related computational and network resource consumption, thereby increasing overall write throughput.
Therefore, this parameter can be configured to optimize the client's message sending throughput. For Confluent kafka Go, it provides a default batching time of 5 ms to accumulate messages. If messages are small, you can appropriately increase the queue.buffering.max.ms value.
About Compression Parameter Optimization
Confluent Go supports the following compression parameters: none, gzip, snappy, lz4, zstd.
In the Confluent Kafka Go client, the following compression algorithms are supported:
none: No compression algorithm is used.
gzip: Uses the GZIP compression algorithm.
snappy: Uses the Snappy compression algorithm.
lz4: Uses the LZ4 compression algorithm.
zstd: Uses the ZSTD compression algorithm.
To use a compression algorithm in the Producer client, set the compression.type parameter when creating the producer. For example, to use the LZ4 compression algorithm, set compression.type to lz4. Although client-side CPU compression and decompression represent a compute-for-bandwidth optimization approach, Brokers incur additional computational costs for validating compressed messages—particularly with Gzip compression, where server-side computational overhead can be substantial. In some cases, this may prove counterproductive, as increased computation reduces Broker message processing capacity and ultimately lowers bandwidth throughput. For such scenarios, consider the following approach:
1. In the Producer, messages are independently compressed to generate compressed data packets: messageCompression, while storing the compression method in the message's key:
{"Compression","CompressionLZ4"}
2. At the Producer end, send messageCompression as a normal message.
3. On the Consumer side, read the message key to obtain the compression method used and independently decompress.

Create Producer Instance

If the application requires higher throughput, an asynchronous producer can be used to increase message delivery speed. Simultaneously, messages can be sent in batches to reduce network overhead and IO consumption. If the application demands higher reliability, a synchronous producer should be used to ensure successful message delivery. Additionally, the ACK confirmation mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter tuning, refer to Producer Parameters and Tuning.

package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// Configure Kafka Producer
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "1",
"compression.type": "none",
"batch.num.messages": "1000",
"max.in.flight.requests.per.connection":"5",
"max.in.flight":"5"
})
if err != nil {
fmt.Printf("Failed to create producer: %s\\n", err)
return
}

// Send messages.
for i := 0; i < 10; i++ {
topic := "test-topic"
value := fmt.Sprintf("hello world %d", i)
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}
p.Produce(message, nil)
}

// Disable Kafka Producer
p.Flush(15 * 1000)
p.Close()
}

Consumer Practices

Consumer Parameters and Tuning

Consumer Parameters


package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// Configure Kafka Consumer
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset":"earliest",
"fetch.min.bytes":1, // Minimum fetch size in bytes
"fetch.max.bytes":52428800, // Maximum fetch size in bytes
"fetch.wait.max.ms":"500", // If there is no new message to consume, wait for 500 ms by default
"enable.auto.commit":true, // Whether to support automatic offset commit. Default is true.
"auto.commit.interval.ms":5000, // Auto-commit interval for offsets, default is 5s
"max.poll.interval.ms": 300000, // Maximum delay between two poll operations. Default is 5 minutes.
"session.timeout.ms": 45000, // session timeout, default is 45s
"heartbeat.interval.ms": 3000, // Heartbeat interval, default is 3s
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}

// Subscribe to a Topic.
c.SubscribeTopics([]string{"test-topic"}, nil)

// Manually commit offsets
for {
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
c.CommitMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}

// Disable Kafka Consumer
c.Close()
}

Parameter Description and Tuning

1. max.poll.interval.ms is a configuration parameter of Kafka Consumer, which specifies the maximum delay between two poll operations. The primary purpose of this parameter is to control the Consumer's liveness, determining whether the Consumer is still active. If the Consumer fails to perform a poll operation within the time specified by max.poll.interval.ms, Kafka will consider the Consumer dead and trigger a rebalance operation. This parameter setting should be adjusted based on actual consumption speed. If set too small, it may cause frequent rebalance operations, increasing Kafka's burden; if set too large, it may prevent Kafka from timely detecting problematic Consumers, thereby affecting message consumption.
2. Common consumption issues mainly involve frequent rebalance times and consumer thread blocking. Refer to the following parameter optimization instructions:
2.1 session.timeout.ms: For versions prior to v0.10.2, it is appropriate to increase this parameter value. It should exceed the time taken to consume a batch of data but not exceed 30s, with 25s recommended. For v0.10.2 and later versions, retain the default value of 10s.
2.2 heartbeat.interval.ms: default is 3s, when setting this value, it must be less than session.timeout.ms/3.
2.3 max.poll.interval.ms: default is 5 minutes. If the number of partitions and consumers is large, it is recommended to appropriately increase this value. The value should be greater than <max.poll.records> / (<number of messages consumed per second per thread> * <number of consumption threads>).
Note:
If message processing is synchronous, that is, pull a message, process it, then pull the next message, the following modifications are required:
You can increase the max.poll.interval.ms value as needed.
Monitor requests with processing time exceeding max.poll.interval.ms, and sample and print timeout durations.
3. For auto-commit offset requests, it is recommended that the auto.commit.interval.ms value not be set below 1000ms. Excessively high frequency offset requests can lead to high Broker CPU usage, affecting read and write operations of other normal services.

Create Consumer Instance

Confluent Go provides a subscription-based approach to create consumers, offering two methods for offset commits: manual offset commit and automatic offset commit.

Automatic Offset Commit

Automatic offset commit: After polling messages, the consumer automatically commits offsets without manual intervention. This approach offers simplicity and ease of use but may lead to duplicate message consumption or message loss.

package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// Configure Kafka Consumer
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": true, // Whether to enable automatic offset commit. Set to true to enable automatic offset commit.
"auto.commit.interval.ms": 5000, // Interval for automatic offset commit. Set to 5000 milliseconds (i.e., 5 seconds), indicating that offsets are automatically committed every 5 seconds.
"max.poll.interval.ms": 300000, // The maximum wait time for a Consumer during a poll operation. Set to 300000 milliseconds (i.e., 5 minutes), indicating that the Consumer may wait up to 5 minutes during a poll operation.
"session.timeout.ms": 10000, // Specifies the session timeout between the Consumer and broker, set to 10 seconds.
"heartbeat.interval.ms": 3000, // Specifies the interval at which the Consumer sends heartbeat messages. Set to 3000 milliseconds (i.e., 3 seconds).
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}

// Subscribe to a Topic.
c.SubscribeTopics([]string{"test-topic"}, nil)

// Automatic Offset Commit
for {
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}

// Disable Kafka Consumer
c.Close()

Manual Offset Commit

Manual offset commit: After processing messages, the consumer needs to manually commit offsets. The advantage of this approach is precise control over offset commits, preventing duplicate message consumption or message loss. However, note that overly frequent manual offset commits can cause high CPU usage on the Broker, impacting performance. As message volume increases, high CPU consumption may affect other features of the Broker. Therefore, it is recommended to commit offsets at certain message intervals.

package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// Configure Kafka Consumer
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 10000,
"heartbeat.interval.ms": 3000,
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}

// Subscribe to a Topic.
c.SubscribeTopics([]string{"test-topic"}, nil)

// Manually commit offsets
for {
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
c.CommitMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}

// Disable Kafka Consumer
c.Close()



도움말 및 지원

문제 해결에 도움이 되었나요?

피드백