tencent cloud

Sarama Go
Last updated:2026-01-20 17:10:13
Sarama Go
Last updated: 2026-01-20 17:10:13

Background

TDMQ CKafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It provides features such as high throughput, low latency, scalability, and fault tolerance.
Sarama: A Kafka library developed by Shopify, providing functionalities such as producers, consumers, and partition consumers. This library offers excellent performance and has active community support.
Confluent-Kafka-Go: A Kafka library developed by Confluent, providing high-level APIs that are easy to use. This library is based on the librdkafka C library, offering excellent performance, but its installation and usage are slightly more complex.
This article focuses on introducing the key parameters, best practices, and common issues of the aforementioned Sarama Go client.

Producer Practices

Version Selection

When selecting the Sarama client version, it is necessary to ensure that the selected version is compatible with the Kafka broker version. The Sarama library supports multiple Kafka protocol versions, and you can specify the protocol version to use by setting config.Version. The correspondence between common Kafka protocol versions and Sarama library versions is as follows. For the latest version, see Sarama version.
Kafka version
Sarama Library Version
Sarama Protocol Version Constants
0.8.2.x
>= 1.0.0
sarama.V0_8_2_0
0.9.0.x
>= 1.0.0
sarama.V0_9_0_0
0.10.0.x
>= 1.0.0
sarama.V0_10_0_0
0.10.1.x
>= 1.0.0
sarama.V0_10_1_0
0.10.2.x
>= 1.0.0
sarama.V0_10_2_0
0.11.0.x
>= 1.16.0
sarama.V0_11_0_0
1.0.x
>= 1.16.0
sarama.V1_0_0_0
1.1.x
>= 1.19.0
sarama.V1_1_0_0
2.0.x
>= 1.19.0
sarama.V2_0_0_0
2.1.x
>= 1.21.0
sarama.V2_1_0_0
2.2.x
>= 1.23.0
sarama.V2_2_0_0
2.3.x
>= 1.24.0
sarama.V2_3_0_0
2.4.x
>= 1.27.0
sarama.V2_4_0_0
2.5.x
>= 1.28.0
sarama.V2_5_0_0
2.6.x
>= 1.29.0
sarama.V2_6_0_0
2.7.x
>= 1.29.0
sarama.V2_7_0_0
2.8.x and above
It is recommended to use >=1.42.1
sarama.V2_8_0_0-sarama.V3_6_0_0
The Sarama library versions listed above are the minimum versions that support the corresponding Kafka protocol versions. To achieve optimal performance and utilize new features, it is recommended to use the latest version of Sarama. When using the latest version, customers can specify the protocol version compatible with their Kafka broker by setting config.Version. The configuration method is as follows. The version must be set before use; otherwise, unexpected incompatibility issues may occur:
config := sarama.NewConfig()
config.Version = sarama.V2_7_0_0 // Set the protocol version based on the actual Kafka version

Producer Parameters and Tuning

Producer Parameters

When using the Sarama Go client to write to kafka, the following key parameters need to be configured, and the relevant parameters and their default values are as follows:

config := sarama.NewConfig()
sarama.MaxRequestSize = 100 * 1024 * 1024 // Maximum request size, default 100 MB, adjustable. Writing messages larger than 100 MB will directly result in an error.
sarama.MaxResponseSize = 100 * 1024 * 1024 // Maximum response size, default 100 MB, adjustable. Obtaining messages larger than 100 MB will directly result in an error.

config.Producer.RequiredAcks = sarama.WaitForLocal // default value is sarama.WaitForLocal(1)

config.Producer.Retry.Max = 3 // Maximum number of producer retries, default is 3
config.Producer.Retry.Backoff = 100 * time.Millisecond // Backoff time between producer retries, default is 100 milliseconds

config.Producer.Return.Successes = false // Whether to return successful messages, default is false
config.Producer.Return.Errors = true // Return of failed messages, default is true

config.Producer.Compression = CompressionNone // Whether messages are compressed before sending, default is no compression
config.Producer.CompressionLevel = CompressionLevelDefault // Specify the compression level, which takes effect after a compression algorithm is configured

config.Producer.Flush.Frequency = 0 // Time duration for which messages are buffered in the producer, default is 0 milliseconds
config.Producer.Flush.Bytes = 0 // The number of bytes that triggers a broker request, default is 0 (send immediately). The natural upper limit is MaxRequestSize, so the default maximum is 100 MB
config.Producer.Flush.Messages = 0 // Number of messages that forces a broker request when reached; this is the upper limit
config.Producer.Flush.MaxMessages = 0 // Maximum number of messages to buffer, default is 0 (send immediately). When MaxMessages is set above 0, Messages must be set, and MaxMessages ≥ Messages must hold

config.Producer.Timeout = 5 * time.Second // Timeout duration, default is 5 seconds

config.Producer.Idempotent = false // Whether to enable idempotency, default is false
config.Producer.Transaction.Timeout = 1 * time.Minute // Transaction timeout duration, default is 1 minute
config.Producer.Transaction.Retry.Max = 50 // Maximum number of transaction retries
config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
config.Net.MaxOpenRequests = 5 // Default is 5, number of requests sent at once
config.Producer.Transaction.ID = "test" // Transaction ID

config.ClientID = "your-client-id" // Client ID

Parameter Description Tuning

On RequiredAcks Parameter Optimization

RequiredAcks parameter controls the acknowledgment mechanism for message production. Its default value is WaitForLocal, meaning the producer returns as soon as the Leader Broker acknowledges the message is written. The RequiredAcks parameter also supports the following optional values:
NoResponse: Returning immediately without waiting for any acknowledgment.
WaitForLocal: Returning after the Leader replica acknowledges the write.
WaitForAll: Returning after both the Leader replica and relevant Follower replicas acknowledge the write.
As indicated, in cross-AZ scenarios and for topics with a high number of replicas, the value of the RequiredAcks parameter affects message reliability and throughput. Therefore:
In scenarios involving online business messages where throughput requirements are not high, setting the RequiredAcks parameter to WaitForAll ensures that messages are returned only after being received and acknowledged by all replicas, thereby enhancing message reliability.
In scenarios such as log collection, big data, or offline computing where high throughput (i.e., the amount of data written to Kafka per second) is required, setting RequiredAcks to WaitForLocal can improve throughput.

About Flush Parameter Optimization (Caching)

By default, when transmitting the same amount of data, using a single request for network transmission instead of multiple requests can effectively reduce related computational and network resource consumption, thereby increasing overall write throughput. Therefore, this parameter can be configured to optimize the message-sending throughput of clients. In high-throughput scenarios, it can be used in conjunction with calculation and configuration:
Assuming the target Broker throughput is 16 MB/s, the average message size is 1KB, and the tolerable latency is 500 ms:
1. Calculate batch size:
Theoretical batch = 16MB/s × 0.5s = 8MB
Actual batch = min(Theoretical batch, MaxMessageBytes)
Assume MaxMessageBytes = 4MB
The recommended actual batch Bytes = 4MB
2. Calculate Frequency:
Batch fill time = 4MB / 16MB/s = 0.25s
Recommended Frequency = 0.25s × 1.2 ≈ 300ms // where 1.2 is the amplification factor, primarily used to avoid empty batches
3. Messages Settings:
Matching Messages = (4MB / 1KB) × 1.5 ≈ 6000 // where 1.5 is the amplification factor, primarily used to prevent messages from being too small
4. MaxMessages Settings:
Messages means the maximum number of messages allowed to accumulate in the local memory buffer of a producer instance. MaxMessages means the maximum number of messages sent per Flush operation. Therefore, MaxMessages must be ≥ Messages, thus ensuring that all messages in the buffer are sent out.
// Standard configuration for high-throughput scenarios (when throughput ≥ 16MB/s)
config.Producer.Flush.Bytes = 4 * 1024 * 1024 // 4MB
config.Producer.Flush.Frequency = 300 * time.Millisecond
config.Producer.Flush.Messages = 6000 // To prevent backlog of small messages, can be set to a higher value, e.g. 10000
config.Producer.Flush.MaxMessages = 6000 // Referencing the Messages parameter, can be set to a higher value, e.g. 10000. Must ensure MaxMessages ≥ Messages

Optimizing Transaction Parameters


config.Producer.Idempotent = true // Whether to enable idempotency, in transactional scenarios, it needs to be set to true
config.Producer.Transaction.Timeout = 1 * time.Minute // Transaction timeout duration, default is 1 minute
config.Producer.Transaction.Retry.Max = 50 // Maximum number of transaction retries
config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
config.Net.MaxOpenRequests = 5 // Default is 5, number of requests sent at once
config.Producer.Transaction.ID = "test" // Transaction ID

It should be emphasized that transactions incur additional computational resources to ensure exactly once semantics for messages. Therefore, config.Net.MaxOpenRequests must be set to ≤5. The ProducerStateManager instance on the Broker side caches the most recent 5 batches of data sent by each PID per Topic-Partition. If maintaining certain throughput is required while using transactions, this value can be set to 5. Simultaneously, appropriately increase the transaction timeout duration to tolerate latency issues caused by network jitter under high loads.

About Compression Parameter Optimization

Sarama Go supports the following compression parameters:
config.Producer.Compression = CompressionNone // Whether messages are compressed before sending, default is no compression
config.Producer.CompressionLevel = CompressionLevelDefault // Specify the compression level, which takes effect after a compression algorithm is configured
The Sarama Kafka Go client supports the following compression configurations:
1. sarama.CompressionNone: No compression is used.
2. sarama.CompressionGZIP: Uses GZIP compression.
3. sarama.CompressionSnappy: Uses Snappy compression.
4. sarama.CompressionLZ4: Uses LZ4 compression.
5. sarama.CompressionZSTD: Uses ZSTD compression.
To use compressed messages in the Sarama Kafka Go client, set the config.Producer.Compression parameter when creating a producer. For example, to use the LZ4 compression algorithm, set config.Producer.Compression to sarama.CompressionLZ4. Although message compression and decompression occur on the client side—an optimization trading computation for bandwidth—Brokers incur additional computational costs for validating compressed messages. This is particularly significant with GZIP compression, where validation costs can be substantial. In some cases, the gains may not justify the costs, as increased computation can reduce the Broker's message processing capacity, ultimately lowering bandwidth throughput. For low-throughput or low-specification services, using compressed messages is not recommended. If compression is still required, consider the following approach:
1. In the Producer, messages are independently compressed to generate compressed data packets: messageCompression, while storing the compression method in the message's key:
{"Compression","CompressionLZ4"}
2. In the Producer, treat messageCompression as a normal message for sending.
3. On the Consumer side, read the message key to obtain the compression method used for compression and independently decompress the data.

Memory Optimization for Compression Parameters

When using the Go Sarama client with LZ4 compression, messages require server-side decompression for validation. The Sarama client's default LZ4 configuration requests more memory than the official Java client, leading to rapid memory consumption. Prolonged memory allocation times cause Kafka processing threads to stall while waiting for partition lock release during production requests. This can saturate the request queue, increasing production latency and CPU utilization, ultimately impacting Kafka cluster performance. Therefore, customers are advised to optimize the following configurations:
Go Sarama client: The default memory allocation for LZ4 message decompression is 4 MB. When the number of requests doubles, the impact on shared clusters is significant. Therefore, it is recommended to configure it to: 64 KB. Java client: The default memory allocation for LZ4 message decompression is 64 KB.
Example:
lz4.DefaultBlockSizeOption = lz4.BlockSizeOption(lz4.Block64Kb)

Create Producer Instance

If the application requires higher throughput, an asynchronous producer can be used to increase message delivery speed. Simultaneously, messages can be sent in batches to reduce network overhead and IO consumption. If the application demands higher reliability, a synchronous producer should be used to ensure successful message delivery. Additionally, the ACK confirmation mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter tuning, refer to Producer Parameters and Tuning.

Synchronous Producer

In the Sarama Kafka Go client, there are two types of producers: synchronous producers and asynchronous producers. They differ primarily in how messages are sent and how message results are handled. Synchronous Producer: A synchronous producer blocks the current thread when sending a message until the message delivery is complete and server acknowledgment is received. Consequently, synchronous producers offer lower throughput but allow you to immediately determine whether a message was successfully sent. Example:
package main

import (
"fmt"
"log"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Return.Errors = true

brokers := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("Hello, World!"),
}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Failed to send message: %v", err)
} else {
fmt.Printf("Message sent to partition %d at offset %d\\n", partition, offset)
}
}

Asynchronous Producer

Asynchronous Producer: An asynchronous producer does not block the current thread when sending messages. Instead, it places messages into an internal send queue and returns immediately. Consequently, it offers higher throughput but requires callback functions to handle message results.
package main

import (
"fmt"
"log"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Return.Errors = true

brokers := []string{"localhost:9092"}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.StringEncoder("Hello, World!"),
}

producer.Input() <- msg

select {
case success := <-producer.Successes():
fmt.Printf("Message sent to partition %d at offset %d\\n", success.Partition, success.Offset)
case err := <-producer.Errors():
log.Printf("Failed to send message: %v", err)
}
}

Consumer Practices

Version Selection

When selecting the Sarama client version, ensure that the selected version is compatible with the Kafka broker version. The Sarama library supports multiple Kafka protocol versions, and you can specify the protocol version to use by setting config.Version.
config := sarama.NewConfig()
config.Version = sarama.V2_8_2_0

Consumer Parameters and Tuning


config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // The default approach for partition assignment
config.Consumer.Offsets.Initial = sarama.OffsetNewest // When there is no committed offset, use the latest offset or the oldest offset. Default is the latest message offset
config.Consumer.Offsets.AutoCommit.Enable = true // Whether to enable automatic offset commit. Enabled by default.
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // Auto-commit interval for offsets, default is 1s
config.Consumer.MaxWaitTime = 250 * time.Millisecond // Client wait time when there are no new messages to consume, default is 250 ms
config.Consumer.MaxProcessingTime = 100 * time.Millisecond
config.Consumer.Fetch.Min = 1 // Minimum number of message bytes to fetch in a consumer request. The Broker will wait until at least this many bytes are available before responding. Default is 1; cannot be set to 0 as it would cause the consumer to idle when no messages are available.
config.Consumer.Fetch.Max = 0 // Maximum bytes that can be fetched in a single request. Default is 0, indicating no limit
config.Consumer.Fetch.Default = 1024 * 1024 // Default message bytes for fetch requests (default 1MB). Should be larger than most messages in the instance; otherwise, the Broker will spend significant time determining whether the fetched data meets this value.
config.Consumer.Return.Errors = true

config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // Set the rebalance strategy for consumer groups to NewBalanceStrategyRange. Default is NewBalanceStrategyRange
config.Consumer.Group.Rebalance.Timeout = 60 * time.Second // Set the timeout for rebalance operations. Default is 60s.
config.Consumer.Group.Session.Timeout = 10 * time.Second // Sets the timeout for consumer group sessions. Default is 10s
config.Consumer.Group.Heartbeat.Interval = 3 * time.Second // Heartbeat interval, default is 3s
config.Consumer.MaxProcessingTime = 100 * time.Millisecond // Timeout for message processing, default is 100ms

Parameter Description and Tuning

Common consumption issues primarily involve frequent rebalance times and consumer thread blocking problems. Refer to the following parameters for optimization:
config.Consumer.Group.Session.Timeout: For versions prior to v0.10.2, this parameter value can be appropriately increased. It should exceed the time to consume a batch of data but must not exceed 30s (25s is recommended). For v0.10.2 and later versions, retain the default value of 10s.
config.Consumer.Group.Heartbeat.Interval: Defaults to 3s. This value must be set to less than Consumer.Group.Session.Timeout/3.
config.Consumer.Group.Rebalance.Timeout: Default is 60s. If there are a large number of partitions and consumers, it is recommended to appropriately increase this value.
config.Consumer.MaxProcessingTime: This value must be greater than <max.poll.records> / (<records consumed per second per thread> * <number of consumer threads>).
Note:
Increase the MaxProcessingTime duration based on requirements.
Monitor requests with processing times greater than MaxProcessingTime, and sample and print the timeout durations.

Create Consumer Instance

Sarama provides a subscription-based model for creating consumers, offering two methods for offset commits: manual offset commit and automatic offset commit.

Automatic Offset Commit

Automatic offset commit: After polling messages, the consumer automatically commits offsets without manual intervention. This approach offers simplicity and ease of use but may lead to duplicate message consumption or message loss.
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"time"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

brokers := []string{"localhost:9092"}
topic := "test-topic"

client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
if err != nil {
log.Fatalf("unable to create kafka consumer group: %v", err)
}
defer client.Close()

ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

for {
err := client.Consume(ctx, []string{topic}, &consumerHandler{})
if err != nil {
log.Printf("consume error: %v", err)
}

select {
case <-signals:
cancel()
return
default:
}
}
}()

wg.Wait()
}

type consumerHandler struct{}

func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}

Manual Offset Commit

Manual offset commit: After processing messages, the consumer needs to manually commit offsets. The advantage of this approach is precise control over offset commits, preventing duplicate message consumption or message loss. However, note that overly frequent manual offset commits can cause high CPU usage on the Broker, impacting performance. As message volume increases, high CPU consumption may affect other features of the Broker. Therefore, it is recommended to commit offsets at certain message intervals.
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false

brokers := []string{"localhost:9092"}
topic := "test-topic"

client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
if err != nil {
log.Fatalf("unable to create kafka consumer group: %v", err)
}
defer client.Close()

ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

for {
err := client.Consume(ctx, []string{topic}, &consumerHandler{})
if err != nil {
log.Printf("consume error: %v", err)
}

select {
case <-signals:
cancel()
return
default:
}
}
}()

wg.Wait()
}

type consumerHandler struct{}

func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
sess.Commit()
}
return nil
}

Sarama Go Producer and Consumer Common Issues

1. Manual offset commit is configured, but the offsets are not updated when querying the consumer group in the console.
Regardless of whether manual or automatic offset commit is configured, it is necessary to first mark the message using `sess.MarkMessage(msg, "")` to indicate that the message has been consumed before committing the offset.
2. Sarama Go has some issues as a consumer. The Sarama Go version client has the following known problems:
2.1 When new partitions are added to a Topic, the Sarama Go client cannot detect and consume from these newly added partitions. The client must be restarted in order to consume from the new partitions.
2.2 When the Sarama Go client simultaneously subscribes to more than two Topics, it may cause some partitions to fail to consume messages properly.
2.3 When the Sarama Go client's offset reset policy is set to Oldest (earliest), if the client crashes or the server undergoes a version upgrade, the self-implemented OutOfRange mechanism in the Sarama Go client may cause it to re-consume all messages starting from the earliest offset.
2.4 For this issue: The Confluent Go client Demo repository is available at kafka-confluent-go-Demo.
3. Error: Failed to produce message to topic.
The issue may be caused by a version mismatch. In this case, the customer should first determine the kafka Broker version and then specify it:
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback