新功能发布记录
Broker 版本升级记录
公告
go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
参数 | 描述 |
topic | Topic 名称,您可以在控制台上 topic 管理页面复制。 ![]() |
sasl.username | 用户名,在控制台ACL策略管理下的用户管理页面创建用户时设置。 |
sasl.password | 用户密码,在控制台ACL策略管理下的用户管理页面创建用户时设置。 |
bootstrapServers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 ![]() |
consumerGroupId | 您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。 |
// main.gopackage mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// ======== 请替换为你的实际配置 ========brokers := "$bootstrapServers" // Kafka 地址topic := "$topic" // topic名字saslUsername := "$sasl.username" // SASL 用户名saslPassword := "$sasl.password" // SASL 密码caFile := "./CARoot.pem" // CA 证书路径,控制台下载retries := 5 // 重试次数message := "Hello from Go with SASL+SSL"// =====================================config := &kafka.ConfigMap{"bootstrap.servers": brokers,"security.protocol": "sasl_ssl", // 启用 SASL over SSL"ssl.ca.location": caFile, // 指定 CA 证书"sasl.mechanisms": "PLAIN", // 认证机制"sasl.username": saslUsername,"sasl.password": saslPassword,"acks": "1", // 只等待 leader 确认"message.send.max.retries": retries,"retry.backoff.ms": 1000,"socket.timeout.ms": 30000,"session.timeout.ms": 30000,"enable.idempotence": false, // 关闭幂等,避免强制 acks=all"max.in.flight.requests.per.connection": 5,}// 创建生产者p, err := kafka.NewProducer(config)if err != nil {fmt.Printf("创建生产者失败: %v\\n", err)return}defer p.Close()fmt.Printf("生产者已启动,连接到 %s\\n", brokers)// 发送消息err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic,Partition: kafka.PartitionAny,},Value: []byte(message),}, nil)if err != nil {fmt.Printf("消息发送失败: %v\\n", err)} else {fmt.Printf("已发送消息: %s\\n", message)}p.Flush(3*1000)}
go run main.go


// main.gopackage mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// ======== 替换为你的配置 ========brokers := "$bootstrapServers" // Kafka 地址groupID := "$consumerGroupId" // 消费组名字topic := "$topic" // topic名字saslUsername := "$sasl.username" // SASL 用户名saslPassword := "$sasl.password" // SASL 密码caFile := "./CARoot.pem" // CA 证书路径,控制台下载// ==============================// 配置消费者config := &kafka.ConfigMap{"bootstrap.servers": brokers,"security.protocol": "sasl_ssl","ssl.ca.location": caFile,"sasl.mechanisms": "PLAIN","sasl.username": saslUsername,"sasl.password": saslPassword,"group.id": groupID,"auto.offset.reset": "earliest","enable.auto.commit": true, // 启用自动提交"auto.commit.interval.ms": 5000, // 每 5 秒提交一次}// 创建消费者c, err := kafka.NewConsumer(config)if err != nil {panic(err)}defer c.Close()// 订阅主题c.SubscribeTopics([]string{topic}, nil)// 持续拉取消息for {ev := c.Poll(1000)if ev == nil {continue}// 类型断言if msg, ok := ev.(*kafka.Message); ok {if msg.Value != nil {fmt.Printf("收到: %s\\n", string(msg.Value))}}}}
go run main.go


文档反馈