tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

VPC 网络接入

PDF
Mode fokus
Ukuran font
Terakhir diperbarui: 2026-01-05 15:20:01

操作场景

本文以 Go 客户端为例介绍在 VPC 网络环境下接入消息队列 CKafka 版并收发消息的过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数

操作步骤

步骤1:准备配置

1. 将下载的 Demo 中的 gokafkademo 上传至 Linux 服务器。
2. 登录 Linux 服务器,进入 gokafkademo 目录,执行以下命令添加依赖库。
go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
3. 修改配置文件 kafka.json。
{
"topic": [
"test"
],
"bootstrapServers": [
"xx.xx.xx.xx:xxxx"
],
"consumerGroupId": "yourConsumerId"
}
参数
描述
topic
Topic名称,您可以在控制台上 topic 列表页面复制。
bootstrapServers
接入网络,在控制台的实例基本信息页面接入方式模块的网络列复制。
consumerGroupId
您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。

步骤2:发送消息

1. 编写生产消息程序。
package main
import (
"fmt"
"gokafkademo/config"
"log"
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
cfg, err := config.ParseConfig("../config/kafka.json")
if err != nil {
log.Fatal(err)
}
p, err := kafka.NewProducer(&kafka.ConfigMap{
// 设置接入点,请通过控制台获取对应Topic的接入点。
"bootstrap.servers": strings.Join(cfg.Servers, ","),
// 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置
"acks": 1,
// 请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失
"retries": 0,
// 发送请求失败时到下一次重试请求之间的时间
"retry.backoff.ms": 100,
// producer 网络请求的超时时间。
"socket.timeout.ms": 6000,
// 设置客户端内部重试间隔。
"reconnect.backoff.max.ms": 3000,
})
if err != nil {
log.Fatal(err)
}
defer p.Close()
// 产生的消息 传递至报告处理程序
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\\n",ev.TopicPartition)
}
}
}
}()
// 异步发送消息
topic := cfg.Topic[0]
for _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {
_ = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// 等待消息传递
p.Flush(10 * 1000)
}
2. 编译并运行程序发送消息。
go run main.go
3. 查看运行结果,示例如下。
Delivered message to test[0]@628
Delivered message to test[0]@629
4. CKafka 控制台Topic 列表页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。


步骤3:消费消息

1. 编写消费消息程序。
package main

import (
"fmt"
"gokafkademo/config"
"log"
"strings"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

cfg, err := config.ParseConfig("../config/kafka.json")
if err != nil {
log.Fatal(err)
}

c, err := kafka.NewConsumer(&kafka.ConfigMap{
// 设置接入点,请通过控制台获取对应Topic的接入点。
"bootstrap.servers": strings.Join(cfg.Servers, ","),
// 设置的消息消费组
"group.id": cfg.ConsumerGroupId,
"auto.offset.reset": "earliest",

// 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker
// 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间
"session.timeout.ms": 10000,
})

if err != nil {
log.Fatal(err)
}
// 订阅的消息topic 列表
err = c.SubscribeTopics(cfg.Topic, nil)
if err != nil {
log.Fatal(err)
}

for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\\n", msg.TopicPartition, string(msg.Value))
} else {
// 客户端将自动尝试恢复所有的 error
fmt.Printf("Consumer error: %v (%v)\\n", err, msg)
}
}

c.Close()
}
2. 编译并运行程序消费消息。
go run main.go
3. 查看运行结果,示例如下。
Message on test[0]@628: Confluent-Kafka
Message on test[0]@629: Golang Client Message
4. CKafka 控制台Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查看详情,查看消费详情。


Bantuan dan Dukungan

Apakah halaman ini membantu?

masukan