tencent cloud

消息队列 Pulsar 版

动态与公告
新功能发布记录
集群版本更新记录
产品公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 Pulsar 版
产品优势
应用场景
技术原理
产品系列
开源 Pulsar 版本支持说明
与开源 Pulsar 对比
高可用
配额与限制
基础概念
产品计费
计费概述
价格说明
计费示例
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
使用 SDK 收发普通消息
使用 SDK 收发高级特性消息
用户指南
使用流程指引
配置账号权限
新建集群
配置命名空间
配置 Topic
连接集群
管理集群
查询消息及轨迹
跨地域复制
查看监控和配置告警
实践教程
客户端使用实践
异常消费者隔离
限流机制说明
交易对账
消息幂等性
消息压缩
迁移指南
单写多读集群迁移方案
虚拟集群平滑迁移至专业集群
API 参考
API 概览
SDK 参考
SDK 概述
SDK 配置参数推荐
TCP 协议(Pulsar 社区版)
安全与合规
权限管理
删除保护
云 API 审计
常见问题
监控相关
客户端相关
服务协议
服务等级协议
TDMQ 政策
联系我们
词汇表

Go SDK

PDF
聚焦模式
字号
最后更新时间: 2025-12-24 15:26:39

操作场景

本文以调用 Go SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数
说明:
推荐使用 0.13.1 及以上版本。

操作步骤

1. 在客户端环境引入 pulsar-client-go 库。
1.1 在客户端环境执行如下命令下载 Pulsar 客户端相关的依赖包。
go get -u "github.com/apache/pulsar-client-go/pulsar"
1.2 安装完成后,即可通过以下代码引用到您的 Go 工程文件中。
import "github.com/apache/pulsar-client-go/pulsar"
2. 创建 Pulsar Client。
// 创建pulsar客户端
client, err := pulsar.NewClient(pulsar.ClientOptions{
// 服务接入地址
URL: serviceUrl,
// 授权角色密钥
Authentication: pulsar.NewAuthenticationToken(authentication),
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})

if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}

defer client.Close()

参数
说明
serviceUrl
集群接入地址,可以在控制台 集群管理 页面查看并复制。
img


Authentication
角色密钥,在 角色管理 页面复制密钥列。
img


3. 创建生产者。
// 使用客户端创建生产者
producer, err := client.CreateProducer(pulsar.ProducerOptions{
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
})

if err != nil {
log.Fatal(err)
}
defer producer.Close()
说明
Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topicclusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
4. 发送消息。
// 发送消息
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// 消息内容
Payload: []byte("hello go client, this is a message."),
// 业务key
Key: "yourKey",
// 业务参数
Properties: map[string]string{"key": "value"},
})
5. 创建消费者。
// 使用客户端创建消费者
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
// 订阅名称
SubscriptionName: "topic1_sub",
// 订阅模式
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
说明
Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topicclusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
img


subscriptionName 需要写入订阅名,可在消费管理界面查看。
6. 消费消息。
// 获取消息
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
// 模拟业务处理
fmt.Printf("Received message msgId: %#v -- content: '%s'\\n",
msg.ID(), string(msg.Payload()))

// 消费成功,回复ack,消费失败根据业务需要选择回复nack或ReconsumeLater
consumer.Ack(msg)
7. 登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。
img


说明
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 DemoPulsar 官方文档

基于 slog/logrus 的日志输出

目前 pulsar-client-go 0.13 以上版本已经支持标准库 log/slog 以及 sirupsen/logrus 的封装,可以直接使用 slog 和 logrus 作为客户端的日志输出插件。

基于 slog 输出

输出日志到文件

import (
"context"
"fmt"
"log/slog"

"github.com/apache/pulsar-client-go/pulsar"
pulsarlog "github.com/apache/pulsar-client-go/pulsar/log"
"gopkg.in/natefinch/lumberjack.v2"
)

func main() {
fileLogger := &lumberjack.Logger{
Filename: "/tmp/pulsar-go-sdk.log",
MaxSize: 100, // 单个日志文件最大大小,单位 MB
MaxBackups: 5, // 日志文件最大保留数量
MaxAge: 3, // 日志文件最大保留时间,单位 天
LocalTime: true,
}
logger := slog.New(slog.NewJSONHandler(fileLogger, &slog.HandlerOptions{Level: slog.LevelInfo}))
slog.SetDefault(logger)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
// client 初始化时传入 slog logger
// 如果用户在业务代码中已经初始化自定义 slog logger,则在这里传入即可
Logger: pulsarlog.NewLoggerWithSlog(logger),
})
if err != nil {
logger.Error("create client err", "error", err)
return
}
defer client.Close()
...


输出日志到终端的标准输出

import (
"context"
"fmt"
"log/slog"
"os"

"github.com/apache/pulsar-client-go/pulsar"
pulsarlog "github.com/apache/pulsar-client-go/pulsar/log"
)

func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
slog.SetDefault(logger)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
// client 初始化时传入 slog logger
// 如果用户在业务代码中已经初始化自定义 slog logger,则在这里传入即可
Logger: pulsarlog.NewLoggerWithSlog(logger),
})
if err != nil {
logger.Error("create client err", "error", err)
return
}
defer client.Close()
...

日志输出示例

{"time":"2025-12-11T21:46:04.633592529+08:00","level":"INFO","msg":"Published message","msgId":"144:3:0"}
{"time":"2025-12-11T21:46:04.63529706+08:00","level":"INFO","msg":"Published message","msgId":"144:4:0"}
{"time":"2025-12-11T21:46:04.636940226+08:00","level":"INFO","msg":"Published message","msgId":"144:5:0"}
{"time":"2025-12-11T21:46:04.638605954+08:00","level":"INFO","msg":"Published message","msgId":"144:6:0"}
{"time":"2025-12-11T21:46:04.640399743+08:00","level":"INFO","msg":"Published message","msgId":"144:7:0"}
{"time":"2025-12-11T21:46:04.642193681+08:00","level":"INFO","msg":"Published message","msgId":"144:8:0"}
{"time":"2025-12-11T21:46:04.643914022+08:00","level":"INFO","msg":"Published message","msgId":"144:9:0"}
{"time":"2025-12-11T21:46:04.643993602+08:00","level":"INFO","msg":"Closing producer","topic":"persistent://public/default/topic-1","producer_name":"standalone-0-53","producerID":1}
{"time":"2025-12-11T21:46:04.644043546+08:00","level":"INFO","msg":"Producer is shutting down. Close the reconnect event loop","topic":"persistent://public/default/topic-1","producer_name":"standalone-0-53","producerID":1}
{"time":"2025-12-11T21:46:04.644612925+08:00","level":"INFO","msg":"Closed producer","topic":"persistent://public/default/topic-1","producer_name":"standalone-0-53","producerID":1}

基于 Logrus 输出

输出日志到文件

import (
"context"
"fmt"

"github.com/apache/pulsar-client-go/pulsar"
pulsarlog "github.com/apache/pulsar-client-go/pulsar/log"
"github.com/sirupsen/logrus"
"gopkg.in/natefinch/lumberjack.v2"
)

func main() {
logger := logrus.New()
fileLogger := &lumberjack.Logger{
Filename: "/tmp/pulsar-go-sdk.log",
MaxSize: 100, // 单个日志文件最大大小,单位 MB
MaxBackups: 5, // 日志文件最大保留数量
MaxAge: 3, // 日志文件最大保留时间,单位 天
LocalTime: true,
}
logger.SetOutput(fileLogger)
logger.SetLevel(logrus.InfoLevel)
// 自定义时间格式
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat: "2006-01-02 15:04:05",
})

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
// client 初始化时传入 logrus logger
// 如果用户在业务代码中已经初始化自定义 logrus logger,则在这里传入即可
Logger: pulsarlog.NewLoggerWithLogrus(logger),
})
if err != nil {
logger.Error("create client err", "error", err)
return
}
defer client.Close()
...

输出日志到终端的标准输出

import (
"context"
"fmt"
"os"

"github.com/apache/pulsar-client-go/pulsar"
pulsarlog "github.com/apache/pulsar-client-go/pulsar/log"
"github.com/sirupsen/logrus"
)

func main() {
logger := logrus.New()
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.InfoLevel)
// 自定义时间格式
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat: "2006-01-02 15:04:05",
})

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
// client 初始化时传入 logrus logger
// 如果用户在业务代码中已经初始化自定义 logrus logger,则在这里传入即可
Logger: pulsarlog.NewLoggerWithLogrus(logger),
})
if err != nil {
logger.Error("create client err", "error", err)
return
}
defer client.Close()

日志输出示例

{"level":"info","msg":"Published message, msgId = 145:23:0","time":"2025-12-11 21:51:03"}
{"level":"info","msg":"Published message, msgId = 145:24:0","time":"2025-12-11 21:51:03"}
{"level":"info","msg":"Published message, msgId = 145:25:0","time":"2025-12-11 21:51:03"}
{"level":"info","msg":"Published message, msgId = 145:26:0","time":"2025-12-11 21:51:03"}
{"level":"info","msg":"Published message, msgId = 145:27:0","time":"2025-12-11 21:51:03"}
{"level":"info","msg":"Published message, msgId = 145:28:0","time":"2025-12-11 21:51:03"}
{"level":"info","msg":"Published message, msgId = 145:29:0","time":"2025-12-11 21:51:03"}
{"level":"info","msg":"Closing producer","producerID":1,"producer_name":"standalone-0-56","time":"2025-12-11 21:51:03","topic":"persistent://public/default/topic-1"}
{"level":"info","msg":"Producer is shutting down. Close the reconnect event loop","producerID":1,"producer_name":"standalone-0-56","time":"2025-12-11 21:51:03","topic":"persistent://public/default/topic-1"}
{"level":"info","msg":"Closed producer","producerID":1,"producer_name":"standalone-0-56","time":"2025-12-11 21:51:03","topic":"persistent://public/default/topic-1"}

自定义日志文件输出

使用场景

很多用户在使用 Pulsar Go SDK 时,未能自定义指定日志输出,Go SDK 默认将日志输出到了 os.Stderr 中去,具体如下:
// It's recommended to make this a global instance called `log`.
func New() *Logger {
return &Logger{
Out: os.Stderr, // 默认输出
Formatter: new(TextFormatter),
Hooks: make(LevelHooks),
Level: InfoLevel,
ExitFunc: os.Exit,
ReportCaller: false,
}
}
由于日志信息的默认输出大都为 os.Stderr,如果用户没有自定义日志 lib 的话,Go SDK 的日志就会和业务日志混淆到一起,增加了问题定位的难度。

解决方案

Go SDK 在 Client 侧暴露了一个 logger 的接口,可以支持用户自定义自己的 log 输出的格式以及位置等功能,同时也支持使用 logrus 以及 zap 等不同的日志 lib,具体参数如下:
1. 自定义 log lib 实现 Pulsar Go SDK 提供的 log.Logger 的接口:
// ClientOptions is used to construct a Pulsar Client instance.
type ClientOptions struct {
// Configure the logger used by the client.
// By default, a wrapped logrus.StandardLogger will be used, namely,
// log.NewLoggerWithLogrus(logrus.StandardLogger())
// FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic
Logger log.Logger
}
所以用户在使用 Go SDK 时,可以通过自定义 logger 接口的形式,自定义 log lib,来达到将日志重定向到指定位置的目的。下面以 logrus 为例,自定义一个 log lib,将 Go SDK 的日志输出到指定文件:
package main

import (
"fmt"
"io"
"os"

"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/sirupsen/logrus"

)

// logrusWrapper implements Logger interface
// based on underlying logrus.FieldLogger
type logrusWrapper struct {
l logrus.FieldLogger
}

// NewLoggerWithLogrus creates a new logger which wraps
// the given logrus.Logger
func NewLoggerWithLogrus(logger *logrus.Logger, outputPath string) log.Logger {
writer1 := os.Stdout
writer2, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
logrus.Error("create file log.txt failed: %v", err)
}
logger.SetOutput(io.MultiWriter(writer1, writer2))
return &logrusWrapper{
l: logger,
}
}

func (l *logrusWrapper) SubLogger(fs log.Fields) log.Logger {
return &logrusWrapper{
l: l.l.WithFields(logrus.Fields(fs)),
}
}

func (l *logrusWrapper) WithFields(fs log.Fields) log.Entry {
return logrusEntry{
e: l.l.WithFields(logrus.Fields(fs)),
}
}

func (l *logrusWrapper) WithField(name string, value interface{}) log.Entry {
return logrusEntry{
e: l.l.WithField(name, value),
}
}

func (l *logrusWrapper) WithError(err error) log.Entry {
return logrusEntry{
e: l.l.WithError(err),
}
}

func (l *logrusWrapper) Debug(args ...interface{}) {
l.l.Debug(args...)
}

func (l *logrusWrapper) Info(args ...interface{}) {
l.l.Info(args...)
}

func (l *logrusWrapper) Warn(args ...interface{}) {
l.l.Warn(args...)
}

func (l *logrusWrapper) Error(args ...interface{}) {
l.l.Error(args...)
}

func (l *logrusWrapper) Debugf(format string, args ...interface{}) {
l.l.Debugf(format, args...)
}

func (l *logrusWrapper) Infof(format string, args ...interface{}) {
l.l.Infof(format, args...)
}

func (l *logrusWrapper) Warnf(format string, args ...interface{}) {
l.l.Warnf(format, args...)
}

func (l *logrusWrapper) Errorf(format string, args ...interface{}) {
l.l.Errorf(format, args...)
}

type logrusEntry struct {
e logrus.FieldLogger
}

func (l logrusEntry) WithFields(fs log.Fields) log.Entry {
return logrusEntry{
e: l.e.WithFields(logrus.Fields(fs)),
}
}

func (l logrusEntry) WithField(name string, value interface{}) log.Entry {
return logrusEntry{
e: l.e.WithField(name, value),
}
}

func (l logrusEntry) Debug(args ...interface{}) {
l.e.Debug(args...)
}

func (l logrusEntry) Info(args ...interface{}) {
l.e.Info(args...)
}

func (l logrusEntry) Warn(args ...interface{}) {
l.e.Warn(args...)
}

func (l logrusEntry) Error(args ...interface{}) {
l.e.Error(args...)
}

func (l logrusEntry) Debugf(format string, args ...interface{}) {
l.e.Debugf(format, args...)
}

func (l logrusEntry) Infof(format string, args ...interface{}) {
l.e.Infof(format, args...)
}

func (l logrusEntry) Warnf(format string, args ...interface{}) {
l.e.Warnf(format, args...)
}

func (l logrusEntry) Errorf(format string, args ...interface{}) {
l.e.Errorf(format, args...)
}
2. 在创建 client 的时候,指定自定义的 log lib。
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
Logger: NewLoggerWithLogrus(log.StandardLogger(), "test.log"),
})
通过上述 Demo 示例,即可将 Pulsar Go SDK 的日志文件重定向到了当前目录的 test.log 的文件中,用户可以根据自己的需要将日志文件重定向到指定的位置。

SDK 版本相关

社区 issue 及优化点

Go SDK 版本 v0.9.0 以下版本需要先升级到 v0.13.1 再升级 broker,v0.9 及以上版本没遇到问题的情况下,可以不用升级客户端。
较高版本已修复下列严重问题:
1. 修复重连阻塞问题,会导致发送超时(相关参考文档)。
2. 修复连接泄露问题,会导致消费者暴涨(相关参考文档)。
3. 集群变更或者节点故障等会导致消费阻塞,unload 或者消费者重启可以恢复(相关参考文档)。
4. 连接重连的时候可能会触发发送超时(相关参考文档)。
5. 修复消息被 discarded 时,availablePermits 泄露导致消费阻塞问题(相关参考文档)。
6. 修复某些异常情况下 availablePermitsCh 阻塞问题(相关参考文档)。
7. 修复部分异常情况下,同步发送阻塞的问题(相关参考文档)。
8. 支持 group ack 能力,较大提升客户端消费性能(相关参考文档) 。
9. 修复回退时间失效问题,每次都按照 100ms 退避重试(相关参考文档)。
10. 修复非 batch 模式下,消息key设置失败的问题(相关参考文档)。
11. 修复发送失败的时候,将发送 pending 队列中的消息全部设置为失败(相关参考文档)。
12. 修复大量重复发送消息的 bug(解决延迟消息超过服务端限制或者客户端断连重连之后产生的 bug)(相关参考文档)。
完整的社区 issue 参见:相关参考文档

低版本风险隐患

Go SDK v0.9.0 以下(不包含 v0.9.0 )版本,对于极端场景异常处理覆盖不够全面,当 broker 升级重启或网络故障等场景下,有极小概率客户端和服务端重连过程出现异常,导致发送超时或者停止消费等问题。这里强烈建议您先将客户端升级到 v0.13.1 新版本后,再进行 broker 集群版本的更新。

低版本隐患处理手段

高版本的客户端在 broker 升级过程中能够正常重连,基本做到业务无感知。但如果您的客户端 SDK 确实无法升级到新版本,建议您在 broker 集群升级后,关注客户端的日志输出及控制台的生产消费相关指标。
如果出现生产消费卡住的情况,请及时重启客户端,理论上客户端在重启后可恢复正常。如果重启客户端后依旧无法改善,请及时 提交工单 进一步定位处理。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈