Release Notes
Broker Release Notes
Announcement
config := &kafka.ConfigMap{"bootstrap.servers": "localhost","api.version.request": true,}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","acks": "-1", // acks mode, default value is -1"client.id": "rdkafka", // Client ID"compression.type": "none", // Specifies the compression method"compression.level": "-1", // Compression level"batch.num.messages": "10000", // By default, a batch aggregates up to 10,000 messages to form a MessageSet for batch sending, improving performance."batch.size": "1000000", // The batch size limitation for a MessageSet, default limited to a maximum of 1000000 bytes."queue.buffering.max.ms": "5", // Delays batching messages for 5 ms by default before constructing MessageSets to be transmitted to the Broker."queue.buffering.max.messages": "100000", // The total number of messages in Producer batch sending cannot exceed 100000"queue.buffering.max.kbytes": "1048576", // In Producer batch sending, MessageSets"message.send.max.retries": "2147483647", // Retry count, default 2147483647"retry.backoff.ms": "100", // Retry interval, default 100ms"socket.timeout.ms": "60000", // Session timeout duration, default is 60s"max.in.flight.requests.per.connection": "1000000", // The maximum number of requests that can be sent per connection is 1000000"max.in.flight": "1000000" // For a single connection, this value is an alias for max.in.flight.requests.per.connection, allowing up to 1000000 requests to be sent}producer, err := kafka.NewProducer(config)if err != nil {panic(fmt.Sprintf("Failed to create producer: %s", err))}// Use the producer to send messages and perform other operations...// Close the producerproducer.Close()}
max.in.flight.requests.per.connection:5max.in.flight:5
{"Compression","CompressionLZ4"}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// Configure Kafka Producerp, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","acks": "1","compression.type": "none","batch.num.messages": "1000","max.in.flight.requests.per.connection":"5","max.in.flight":"5"})if err != nil {fmt.Printf("Failed to create producer: %s\\n", err)return}// Send messages.for i := 0; i < 10; i++ {topic := "test-topic"value := fmt.Sprintf("hello world %d", i)message := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: []byte(value),}p.Produce(message, nil)}// Disable Kafka Producerp.Flush(15 * 1000)p.Close()}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// Configure Kafka Consumerc, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset":"earliest","fetch.min.bytes":1, // Minimum fetch size in bytes"fetch.max.bytes":52428800, // Maximum fetch size in bytes"fetch.wait.max.ms":"500", // If there is no new message to consume, wait for 500 ms by default"enable.auto.commit":true, // Whether to support automatic offset commit. Default is true."auto.commit.interval.ms":5000, // Auto-commit interval for offsets, default is 5s"max.poll.interval.ms": 300000, // Maximum delay between two poll operations. Default is 5 minutes."session.timeout.ms": 45000, // session timeout, default is 45s"heartbeat.interval.ms": 3000, // Heartbeat interval, default is 3s})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// Subscribe to a Topic.c.SubscribeTopics([]string{"test-topic"}, nil)// Manually commit offsetsfor {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))c.CommitMessage(e)case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// Disable Kafka Consumerc.Close()}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// Configure Kafka Consumerc, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset": "earliest","enable.auto.commit": true, // Whether to enable automatic offset commit. Set to true to enable automatic offset commit."auto.commit.interval.ms": 5000, // Interval for automatic offset commit. Set to 5000 milliseconds (i.e., 5 seconds), indicating that offsets are automatically committed every 5 seconds."max.poll.interval.ms": 300000, // The maximum wait time for a Consumer during a poll operation. Set to 300000 milliseconds (i.e., 5 minutes), indicating that the Consumer may wait up to 5 minutes during a poll operation."session.timeout.ms": 10000, // Specifies the session timeout between the Consumer and broker, set to 10 seconds."heartbeat.interval.ms": 3000, // Specifies the interval at which the Consumer sends heartbeat messages. Set to 3000 milliseconds (i.e., 3 seconds).})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// Subscribe to a Topic.c.SubscribeTopics([]string{"test-topic"}, nil)// Automatic Offset Commitfor {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// Disable Kafka Consumerc.Close()
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// Configure Kafka Consumerc, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset": "earliest","enable.auto.commit": false,"max.poll.interval.ms": 300000,"session.timeout.ms": 10000,"heartbeat.interval.ms": 3000,})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// Subscribe to a Topic.c.SubscribeTopics([]string{"test-topic"}, nil)// Manually commit offsetsfor {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))c.CommitMessage(e)case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// Disable Kafka Consumerc.Close()
Was this page helpful?
You can also Contact sales or Submit a Ticket for help.
Help us improve! Rate your documentation experience in 5 mins.
Feedback