Background
TDMQ CKafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It provides features such as high throughput, low latency, scalability, and fault tolerance.
Sarama: A Kafka library developed by Shopify, providing functionalities such as producers, consumers, and partition consumers. This library offers excellent performance and has active community support. Confluent-Kafka-Go: A Kafka library developed by Confluent, providing high-level APIs that are easy to use. This library is based on the librdkafka C library, offering excellent performance, but its installation and usage are slightly more complex. This article focuses on introducing the key parameters, practical tutorials, and common issues of the aforementioned Confluent Go client.
Producer Practices
Version Selection
When using the Confluent Go SDK, you can specify the address of the Kafka cluster by configuring the "bootstrap.servers" parameter, while the Broker version can be set via the "api.version.request" parameter. This enables the Confluent Go SDK to automatically detect the Broker version upon startup.
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost",
"api.version.request": true,
}
Producer Parameters and Tuning
Producer Parameters
Confluent Go is developed based on librdkafka. When using the Confluent Go client to write to Kafka, the parameters that need to be configured are passed through to librdkafka, mainly involving the following key parameters. The relevant parameters and their default values are as follows:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "-1",
"client.id": "rdkafka",
"compression.type": "none",
"compression.level": "-1",
"batch.num.messages": "10000",
"batch.size": "1000000",
"queue.buffering.max.ms": "5",
"queue.buffering.max.messages": "100000",
"queue.buffering.max.kbytes": "1048576",
"message.send.max.retries": "2147483647",
"retry.backoff.ms": "100",
"socket.timeout.ms": "60000",
"max.in.flight.requests.per.connection": "1000000",
"max.in.flight": "1000000"
}
producer, err := kafka.NewProducer(config)
if err != nil {
panic(fmt.Sprintf("Failed to create producer: %s", err))
}
producer.Close()
}
Parameter Description Tuning
On the Optimization of the max.in.flight.requests.per.connection Parameter
Confluent Go is developed based on librdkafka. In librdkafka, max.in.flight.requests.per.connection is defined as the number of requests that can be sent concurrently on a single connection, with a default value of 1000000. The max.in.flight parameter is an alias for max.in.flight.requests.per.connection, both representing the same meaning. In the standard Java SDK, this parameter is defined as max.in.flight.requests.per.connection with a default value of 5. Setting this value too high may cause server pressure and lead to stability issues. Therefore, it is recommended to align the SDK value of Confluent Go with that of the open-source Java SDK, setting the default value to 5.
max.in.flight.requests.per.connection:5
max.in.flight:5
On acks Parameter Optimization
The acks parameter controls the acknowledgment mechanism for message delivery by producers. Its default value is -1, meaning the producer only returns after the message is sent to the Leader Broker and both the Leader acknowledgment and corresponding Follower messages are fully written. The acks parameter also supports the following optional values: 0, 1, -1. In cross-AZ scenarios and for topics with a high number of replicas, the value of the acks parameter affects message reliability and throughput.
In scenarios involving online business messages where throughput requirements are not high, you can set the acks parameter to -1 to ensure that messages are returned only after being received and acknowledged by all replicas, thereby enhancing message reliability.
In scenarios such as log collection, big data, or offline computing where high throughput (i.e., the amount of data written to Kafka per second) is required, setting acks to 1 can maintain performance while also improving throughput.
About buffering Parameter Optimization (Caching)
By default, when transmitting the same amount of data, using a single request for network transmission instead of multiple requests can effectively reduce related computational and network resource consumption, thereby increasing overall write throughput.
Therefore, this parameter can be configured to optimize the client's message sending throughput. For Confluent kafka Go, it provides a default batching time of 5 ms to accumulate messages. If messages are small, you can appropriately increase the queue.buffering.max.ms value.
About Compression Parameter Optimization
Confluent Go supports the following compression parameters: none, gzip, snappy, lz4, zstd.
In the Confluent Kafka Go client, the following compression algorithms are supported:
none: No compression algorithm is used.
gzip: Uses the GZIP compression algorithm.
snappy: Uses the Snappy compression algorithm.
lz4: Uses the LZ4 compression algorithm.
zstd: Uses the ZSTD compression algorithm.
To use a compression algorithm in the Producer client, set the compression.type parameter when creating the producer. For example, to use the LZ4 compression algorithm, set compression.type to lz4. Although client-side CPU compression and decompression represent a compute-for-bandwidth optimization approach, Brokers incur additional computational costs for validating compressed messages—particularly with Gzip compression, where server-side computational overhead can be substantial. In some cases, this may prove counterproductive, as increased computation reduces Broker message processing capacity and ultimately lowers bandwidth throughput. For such scenarios, consider the following approach:
1. In the Producer, messages are independently compressed to generate compressed data packets: messageCompression, while storing the compression method in the message's key:
{"Compression","CompressionLZ4"}
2. At the Producer end, send messageCompression as a normal message.
3. On the Consumer side, read the message key to obtain the compression method used and independently decompress.
Create Producer Instance
If the application requires higher throughput, an asynchronous producer can be used to increase message delivery speed. Simultaneously, messages can be sent in batches to reduce network overhead and IO consumption. If the application demands higher reliability, a synchronous producer should be used to ensure successful message delivery. Additionally, the ACK confirmation mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter tuning, refer to Producer Parameters and Tuning.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "1",
"compression.type": "none",
"batch.num.messages": "1000",
"max.in.flight.requests.per.connection":"5",
"max.in.flight":"5"
})
if err != nil {
fmt.Printf("Failed to create producer: %s\\n", err)
return
}
for i := 0; i < 10; i++ {
topic := "test-topic"
value := fmt.Sprintf("hello world %d", i)
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}
p.Produce(message, nil)
}
p.Flush(15 * 1000)
p.Close()
}
Consumer Practices
Consumer Parameters and Tuning
Consumer Parameters
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset":"earliest",
"fetch.min.bytes":1,
"fetch.max.bytes":52428800,
"fetch.wait.max.ms":"500",
"enable.auto.commit":true,
"auto.commit.interval.ms":5000,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 45000,
"heartbeat.interval.ms": 3000,
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
c.CommitMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}
c.Close()
}
Parameter Description and Tuning
1. max.poll.interval.ms is a configuration parameter of Kafka Consumer, which specifies the maximum delay between two poll operations. The primary purpose of this parameter is to control the Consumer's liveness, determining whether the Consumer is still active. If the Consumer fails to perform a poll operation within the time specified by max.poll.interval.ms, Kafka will consider the Consumer dead and trigger a rebalance operation. This parameter setting should be adjusted based on actual consumption speed. If set too small, it may cause frequent rebalance operations, increasing Kafka's burden; if set too large, it may prevent Kafka from timely detecting problematic Consumers, thereby affecting message consumption.
2. Common consumption issues mainly involve frequent rebalance times and consumer thread blocking. Refer to the following parameter optimization instructions:
2.1 session.timeout.ms: For versions prior to v0.10.2, it is appropriate to increase this parameter value. It should exceed the time taken to consume a batch of data but not exceed 30s, with 25s recommended. For v0.10.2 and later versions, retain the default value of 10s.
2.2 heartbeat.interval.ms: default is 3s, when setting this value, it must be less than session.timeout.ms/3.
2.3 max.poll.interval.ms: default is 5 minutes. If the number of partitions and consumers is large, it is recommended to appropriately increase this value. The value should be greater than <max.poll.records> / (<number of messages consumed per second per thread> * <number of consumption threads>).
Note:
If message processing is synchronous, that is, pull a message, process it, then pull the next message, the following modifications are required:
You can increase the max.poll.interval.ms value as needed.
Monitor requests with processing time exceeding max.poll.interval.ms, and sample and print timeout durations.
3. For auto-commit offset requests, it is recommended that the auto.commit.interval.ms value not be set below 1000ms. Excessively high frequency offset requests can lead to high Broker CPU usage, affecting read and write operations of other normal services.
Create Consumer Instance
Confluent Go provides a subscription-based approach to create consumers, offering two methods for offset commits: manual offset commit and automatic offset commit.
Automatic Offset Commit
Automatic offset commit: After polling messages, the consumer automatically commits offsets without manual intervention. This approach offers simplicity and ease of use but may lead to duplicate message consumption or message loss.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"auto.commit.interval.ms": 5000,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 10000,
"heartbeat.interval.ms": 3000,
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}
c.Close()
Manual Offset Commit
Manual offset commit: After processing messages, the consumer needs to manually commit offsets. The advantage of this approach is precise control over offset commits, preventing duplicate message consumption or message loss. However, note that overly frequent manual offset commits can cause high CPU usage on the Broker, impacting performance. As message volume increases, high CPU consumption may affect other features of the Broker. Therefore, it is recommended to commit offsets at certain message intervals.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 10000,
"heartbeat.interval.ms": 3000,
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
c.CommitMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}
c.Close()