config := &kafka.ConfigMap{"bootstrap.servers": "localhost","api.version.request": true,}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","acks": -1, //ack方式,默认值为-1"client.id": "rdkafka", //客户端ID"compression.type": "none", //指定压缩方式"compression.level": -1, //压缩等级"batch.num.messages": 10000, //默认一个批次最多聚合10000条消息,构成MessageSet整批发送,提高性能"batch.size": 1000000, //构成MessageSet整批大小限制,默认限制最多不超过1000000字节"queue.buffering.max.ms": 5, //在构造消息批次(MessageSets)传输到Broker之前,默认延迟5ms攒批消息"queue.buffering.max.messages": 100000, //Producer攒批发送中,总的消息数不能超过100000"queue.buffering.max.kbytes": 1048576, //Producer攒批发送中,MessageSets"message.send.max.retries": 2147483647, //重试次数,默认2147483647"retry.backoff.ms": 100, //重试间隔时间,默认100ms"socket.timeout.ms": 60000, //会话超时时间,默认60s}producer, err := kafka.NewProducer(config)if err != nil {panic(fmt.Sprintf("Failed to create producer: %s", err))}// 使用producer发送消息等操作...// 关闭producerproducer.Close()}
{"Compression","CompressionLZ4"}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// 配置Kafka Producerp, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","acks": "1","compression.type": "none","batch.num.messages": "1000",})if err != nil {fmt.Printf("Failed to create producer: %s\\n", err)return}// 发送消息for i := 0; i < 10; i++ {topic := "test-topic"value := fmt.Sprintf("hello world %d", i)message := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: []byte(value),}p.Produce(message, nil)}// 关闭Kafka Producerp.Flush(15 * 1000)p.Close()}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// 配置Kafka Consumerc, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset":"earliest","fetch.min.bytes":1, //最小拉取字节数"fetch.max.bytes":52428800,//最大拉取字节数"fetch.wait.max.ms":"500", //如果没有最新消费消息默认等待500ms"enable.auto.commit":true, //是否支持自动提交位点,默认true"auto.commit.interval.ms":5000,//自动提交位点间隔,默认5s"max.poll.interval.ms": 300000,//Consumer 在两次 poll 操作之间的最大延迟。默认5分钟"session.timeout.ms": 45000,//session时间,默认45s"heartbeat.interval.ms": 3000,//心跳时间,默认3s})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// 订阅主题c.SubscribeTopics([]string{"test-topic"}, nil)// 手动提交位点for {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))c.CommitMessage(e)case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// 关闭Kafka Consumerc.Close()}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// 配置Kafka Consumerc, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset": "earliest","enable.auto.commit": true, //是否启用自动提交位点。设置为true,表示启用自动提交位点。"auto.commit.interval.ms": 5000, //自动提交位点的间隔时间。设置为5000毫秒(即5秒),表示每5秒自动提交一次位点。"max.poll.interval.ms": 300000,//Consumer在一次poll操作中最长的等待时间。设置为300000毫秒(即5分钟),表示Consumer在一次poll操作中最多等待5分钟"session.timeout.ms": 10000,//指定Consumer与broker之间的会话超时时间,设置10秒"heartbeat.interval.ms": 3000, //指定Consumer发送心跳消息的间隔时间。设置为3000毫秒(即3秒)})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// 订阅主题c.SubscribeTopics([]string{"test-topic"}, nil)// 自动提交位点for {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// 关闭Kafka Consumerc.Close()
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// 配置Kafka Consumerc, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset": "earliest","enable.auto.commit": false,"max.poll.interval.ms": 300000,"session.timeout.ms": 10000,"heartbeat.interval.ms": 3000,})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// 订阅主题c.SubscribeTopics([]string{"test-topic"}, nil)// 手动提交位点for {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))c.CommitMessage(e)case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// 关闭Kafka Consumerc.Close()
文档反馈