Kafka version | Sarama Library Version | Sarama Protocol Version Constants |
0.8.2.x | >= 1.0.0 | sarama.V0_8_2_0 |
0.9.0.x | >= 1.0.0 | sarama.V0_9_0_0 |
0.10.0.x | >= 1.0.0 | sarama.V0_10_0_0 |
0.10.1.x | >= 1.0.0 | sarama.V0_10_1_0 |
0.10.2.x | >= 1.0.0 | sarama.V0_10_2_0 |
0.11.0.x | >= 1.16.0 | sarama.V0_11_0_0 |
1.0.x | >= 1.16.0 | sarama.V1_0_0_0 |
1.1.x | >= 1.19.0 | sarama.V1_1_0_0 |
2.0.x | >= 1.19.0 | sarama.V2_0_0_0 |
2.1.x | >= 1.21.0 | sarama.V2_1_0_0 |
2.2.x | >= 1.23.0 | sarama.V2_2_0_0 |
2.3.x | >= 1.24.0 | sarama.V2_3_0_0 |
2.4.x | >= 1.27.0 | sarama.V2_4_0_0 |
2.5.x | >= 1.28.0 | sarama.V2_5_0_0 |
2.6.x | >= 1.29.0 | sarama.V2_6_0_0 |
2.7.x | >= 1.29.0 | sarama.V2_7_0_0 |
2.8.x and above | It is recommended to use >=1.42.1 | sarama.V2_8_0_0-sarama.V3_6_0_0 |
config := sarama.NewConfig()config.Version = sarama.V2_7_0_0 // Set the protocol version based on the actual Kafka version
config := sarama.NewConfig()sarama.MaxRequestSize = 100 * 1024 * 1024 // Maximum request size, default 100 MB, adjustable. Writing messages larger than 100 MB will directly result in an error.sarama.MaxResponseSize = 100 * 1024 * 1024 // Maximum response size, default 100 MB, adjustable. Obtaining messages larger than 100 MB will directly result in an error.config.Producer.RequiredAcks = sarama.WaitForLocal // default value is sarama.WaitForLocal(1)config.Producer.Retry.Max = 3 // Maximum number of producer retries, default is 3config.Producer.Retry.Backoff = 100 * time.Millisecond // Backoff time between producer retries, default is 100 millisecondsconfig.Producer.Return.Successes = false // Whether to return successful messages, default is falseconfig.Producer.Return.Errors = true // Return of failed messages, default is trueconfig.Producer.Compression = CompressionNone // Whether messages are compressed before sending, default is no compressionconfig.Producer.CompressionLevel = CompressionLevelDefault // Specify the compression level, which takes effect after a compression algorithm is configuredconfig.Producer.Flush.Frequency = 0 // Time duration for which messages are buffered in the producer, default is 0 millisecondsconfig.Producer.Flush.Bytes = 0 // The number of bytes that triggers a broker request, default is 0 (send immediately). The natural upper limit is MaxRequestSize, so the default maximum is 100 MBconfig.Producer.Flush.Messages = 0 // Number of messages that forces a broker request when reached; this is the upper limitconfig.Producer.Flush.MaxMessages = 0 // Maximum number of messages to buffer, default is 0 (send immediately). When MaxMessages is set above 0, Messages must be set, and MaxMessages ≥ Messages must holdconfig.Producer.Timeout = 5 * time.Second // Timeout duration, default is 5 secondsconfig.Producer.Idempotent = false // Whether to enable idempotency, default is falseconfig.Producer.Transaction.Timeout = 1 * time.Minute // Transaction timeout duration, default is 1 minuteconfig.Producer.Transaction.Retry.Max = 50 // Maximum number of transaction retriesconfig.Producer.Transaction.Retry.Backoff = 100 * time.Millisecondconfig.Net.MaxOpenRequests = 5 // Default is 5, number of requests sent at onceconfig.Producer.Transaction.ID = "test" // Transaction IDconfig.ClientID = "your-client-id" // Client ID
// Standard configuration for high-throughput scenarios (when throughput ≥ 16MB/s)config.Producer.Flush.Bytes = 4 * 1024 * 1024 // 4MBconfig.Producer.Flush.Frequency = 300 * time.Millisecondconfig.Producer.Flush.Messages = 6000 // To prevent backlog of small messages, can be set to a higher value, e.g. 10000config.Producer.Flush.MaxMessages = 6000 // Referencing the Messages parameter, can be set to a higher value, e.g. 10000. Must ensure MaxMessages ≥ Messages
config.Producer.Idempotent = true // Whether to enable idempotency, in transactional scenarios, it needs to be set to trueconfig.Producer.Transaction.Timeout = 1 * time.Minute // Transaction timeout duration, default is 1 minuteconfig.Producer.Transaction.Retry.Max = 50 // Maximum number of transaction retriesconfig.Producer.Transaction.Retry.Backoff = 100 * time.Millisecondconfig.Net.MaxOpenRequests = 5 // Default is 5, number of requests sent at onceconfig.Producer.Transaction.ID = "test" // Transaction ID
config.Producer.Compression = CompressionNone // Whether messages are compressed before sending, default is no compressionconfig.Producer.CompressionLevel = CompressionLevelDefault // Specify the compression level, which takes effect after a compression algorithm is configured
{"Compression","CompressionLZ4"}
lz4.DefaultBlockSizeOption = lz4.BlockSizeOption(lz4.Block64Kb)
package mainimport ("fmt""log""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForLocalconfig.Producer.Return.Errors = truebrokers := []string{"localhost:9092"}producer, err := sarama.NewSyncProducer(brokers, config)if err != nil {log.Fatalf("Failed to create producer: %v", err)}defer producer.Close()msg := &sarama.ProducerMessage{Topic: "test",Value: sarama.StringEncoder("Hello, World!"),}partition, offset, err := producer.SendMessage(msg)if err != nil {log.Printf("Failed to send message: %v", err)} else {fmt.Printf("Message sent to partition %d at offset %d\\n", partition, offset)}}
package mainimport ("fmt""log""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForLocalconfig.Producer.Return.Errors = truebrokers := []string{"localhost:9092"}producer, err := sarama.NewAsyncProducer(brokers, config)if err != nil {log.Fatalf("Failed to create producer: %v", err)}defer producer.Close()msg := &sarama.ProducerMessage{Topic: "test",Value: sarama.StringEncoder("Hello, World!"),}producer.Input() <- msgselect {case success := <-producer.Successes():fmt.Printf("Message sent to partition %d at offset %d\\n", success.Partition, success.Offset)case err := <-producer.Errors():log.Printf("Failed to send message: %v", err)}}
config := sarama.NewConfig()config.Version = sarama.V2_8_2_0
config := sarama.NewConfig()config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // The default approach for partition assignmentconfig.Consumer.Offsets.Initial = sarama.OffsetNewest // When there is no committed offset, use the latest offset or the oldest offset. Default is the latest message offsetconfig.Consumer.Offsets.AutoCommit.Enable = true // Whether to enable automatic offset commit. Enabled by default.config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // Auto-commit interval for offsets, default is 1sconfig.Consumer.MaxWaitTime = 250 * time.Millisecond // Client wait time when there are no new messages to consume, default is 250 msconfig.Consumer.MaxProcessingTime = 100 * time.Millisecondconfig.Consumer.Fetch.Min = 1 // Minimum number of message bytes to fetch in a consumer request. The Broker will wait until at least this many bytes are available before responding. Default is 1; cannot be set to 0 as it would cause the consumer to idle when no messages are available.config.Consumer.Fetch.Max = 0 // Maximum bytes that can be fetched in a single request. Default is 0, indicating no limitconfig.Consumer.Fetch.Default = 1024 * 1024 // Default message bytes for fetch requests (default 1MB). Should be larger than most messages in the instance; otherwise, the Broker will spend significant time determining whether the fetched data meets this value.config.Consumer.Return.Errors = trueconfig.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // Set the rebalance strategy for consumer groups to NewBalanceStrategyRange. Default is NewBalanceStrategyRangeconfig.Consumer.Group.Rebalance.Timeout = 60 * time.Second // Set the timeout for rebalance operations. Default is 60s.config.Consumer.Group.Session.Timeout = 10 * time.Second // Sets the timeout for consumer group sessions. Default is 10sconfig.Consumer.Group.Heartbeat.Interval = 3 * time.Second // Heartbeat interval, default is 3sconfig.Consumer.MaxProcessingTime = 100 * time.Millisecond // Timeout for message processing, default is 100ms
package mainimport ("context""fmt""log""os""os/signal""sync""time""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Version = sarama.V2_1_0_0config.Consumer.Offsets.Initial = sarama.OffsetOldestconfig.Consumer.Offsets.AutoCommit.Enable = trueconfig.Consumer.Offsets.AutoCommit.Interval = 1 * time.Secondbrokers := []string{"localhost:9092"}topic := "test-topic"client, err := sarama.NewConsumerGroup(brokers, "test-group", config)if err != nil {log.Fatalf("unable to create kafka consumer group: %v", err)}defer client.Close()ctx, cancel := context.WithCancel(context.Background())signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for {err := client.Consume(ctx, []string{topic}, &consumerHandler{})if err != nil {log.Printf("consume error: %v", err)}select {case <-signals:cancel()returndefault:}}}()wg.Wait()}type consumerHandler struct{}func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)sess.MarkMessage(msg, "")}return nil}
package mainimport ("context""fmt""log""os""os/signal""sync""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Version = sarama.V2_1_0_0config.Consumer.Offsets.Initial = sarama.OffsetOldestconfig.Consumer.Offsets.AutoCommit.Enable = falsebrokers := []string{"localhost:9092"}topic := "test-topic"client, err := sarama.NewConsumerGroup(brokers, "test-group", config)if err != nil {log.Fatalf("unable to create kafka consumer group: %v", err)}defer client.Close()ctx, cancel := context.WithCancel(context.Background())signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for {err := client.Consume(ctx, []string{topic}, &consumerHandler{})if err != nil {log.Printf("consume error: %v", err)}select {case <-signals:cancel()returndefault:}}}()wg.Wait()}type consumerHandler struct{}func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)sess.MarkMessage(msg, "")sess.Commit()}return nil}
config := sarama.NewConfig()config.Version = sarama.V2_1_0_0
Feedback