新功能发布记录
集群版本更新记录
产品公告
pulsar-client-go 库。go get -u "github.com/apache/pulsar-client-go/pulsar"
import "github.com/apache/pulsar-client-go/pulsar"
// 创建pulsar客户端client, err := pulsar.NewClient(pulsar.ClientOptions{// 服务接入地址URL: serviceUrl,// 授权角色密钥Authentication: pulsar.NewAuthenticationToken(authentication),OperationTimeout: 30 * time.Second,ConnectionTimeout: 30 * time.Second,})if err != nil {log.Fatalf("Could not instantiate Pulsar client: %v", err)}defer client.Close()
// 使用客户端创建生产者producer, err := client.CreateProducer(pulsar.ProducerOptions{// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",})if err != nil {log.Fatal(err)}defer producer.Close()
persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
// 发送消息_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{// 消息内容Payload: []byte("hello go client, this is a message."),// 业务keyKey: "yourKey",// 业务参数Properties: map[string]string{"key": "value"},})
// 使用客户端创建消费者consumer, err := client.Subscribe(pulsar.ConsumerOptions{// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",// 订阅名称SubscriptionName: "topic1_sub",// 订阅模式Type: pulsar.Shared,})if err != nil {log.Fatal(err)}defer consumer.Close()
persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。

// 获取消息msg, err := consumer.Receive(context.Background())if err != nil {log.Fatal(err)}// 模拟业务处理fmt.Printf("Received message msgId: %#v -- content: '%s'\\n",msg.ID(), string(msg.Payload()))// 消费成功,回复ack,消费失败根据业务需要选择回复nack或ReconsumeLaterconsumer.Ack(msg)

import ("context""fmt""log/slog""github.com/apache/pulsar-client-go/pulsar"pulsarlog "github.com/apache/pulsar-client-go/pulsar/log""gopkg.in/natefinch/lumberjack.v2")func main() {fileLogger := &lumberjack.Logger{Filename: "/tmp/pulsar-go-sdk.log",MaxSize: 100, // 单个日志文件最大大小,单位 MBMaxBackups: 5, // 日志文件最大保留数量MaxAge: 3, // 日志文件最大保留时间,单位 天LocalTime: true,}logger := slog.New(slog.NewJSONHandler(fileLogger, &slog.HandlerOptions{Level: slog.LevelInfo}))slog.SetDefault(logger)client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650",// client 初始化时传入 slog logger// 如果用户在业务代码中已经初始化自定义 slog logger,则在这里传入即可Logger: pulsarlog.NewLoggerWithSlog(logger),})if err != nil {logger.Error("create client err", "error", err)return}defer client.Close()...
import ("context""fmt""log/slog""os""github.com/apache/pulsar-client-go/pulsar"pulsarlog "github.com/apache/pulsar-client-go/pulsar/log")func main() {logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))slog.SetDefault(logger)client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650",// client 初始化时传入 slog logger// 如果用户在业务代码中已经初始化自定义 slog logger,则在这里传入即可Logger: pulsarlog.NewLoggerWithSlog(logger),})if err != nil {logger.Error("create client err", "error", err)return}defer client.Close()...
{"time":"2025-12-11T21:46:04.633592529+08:00","level":"INFO","msg":"Published message","msgId":"144:3:0"}{"time":"2025-12-11T21:46:04.63529706+08:00","level":"INFO","msg":"Published message","msgId":"144:4:0"}{"time":"2025-12-11T21:46:04.636940226+08:00","level":"INFO","msg":"Published message","msgId":"144:5:0"}{"time":"2025-12-11T21:46:04.638605954+08:00","level":"INFO","msg":"Published message","msgId":"144:6:0"}{"time":"2025-12-11T21:46:04.640399743+08:00","level":"INFO","msg":"Published message","msgId":"144:7:0"}{"time":"2025-12-11T21:46:04.642193681+08:00","level":"INFO","msg":"Published message","msgId":"144:8:0"}{"time":"2025-12-11T21:46:04.643914022+08:00","level":"INFO","msg":"Published message","msgId":"144:9:0"}{"time":"2025-12-11T21:46:04.643993602+08:00","level":"INFO","msg":"Closing producer","topic":"persistent://public/default/topic-1","producer_name":"standalone-0-53","producerID":1}{"time":"2025-12-11T21:46:04.644043546+08:00","level":"INFO","msg":"Producer is shutting down. Close the reconnect event loop","topic":"persistent://public/default/topic-1","producer_name":"standalone-0-53","producerID":1}{"time":"2025-12-11T21:46:04.644612925+08:00","level":"INFO","msg":"Closed producer","topic":"persistent://public/default/topic-1","producer_name":"standalone-0-53","producerID":1}
import ("context""fmt""github.com/apache/pulsar-client-go/pulsar"pulsarlog "github.com/apache/pulsar-client-go/pulsar/log""github.com/sirupsen/logrus""gopkg.in/natefinch/lumberjack.v2")func main() {logger := logrus.New()fileLogger := &lumberjack.Logger{Filename: "/tmp/pulsar-go-sdk.log",MaxSize: 100, // 单个日志文件最大大小,单位 MBMaxBackups: 5, // 日志文件最大保留数量MaxAge: 3, // 日志文件最大保留时间,单位 天LocalTime: true,}logger.SetOutput(fileLogger)logger.SetLevel(logrus.InfoLevel)// 自定义时间格式logger.SetFormatter(&logrus.JSONFormatter{TimestampFormat: "2006-01-02 15:04:05",})client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650",// client 初始化时传入 logrus logger// 如果用户在业务代码中已经初始化自定义 logrus logger,则在这里传入即可Logger: pulsarlog.NewLoggerWithLogrus(logger),})if err != nil {logger.Error("create client err", "error", err)return}defer client.Close()...
import ("context""fmt""os""github.com/apache/pulsar-client-go/pulsar"pulsarlog "github.com/apache/pulsar-client-go/pulsar/log""github.com/sirupsen/logrus")func main() {logger := logrus.New()logger.SetOutput(os.Stdout)logger.SetLevel(logrus.InfoLevel)// 自定义时间格式logger.SetFormatter(&logrus.JSONFormatter{TimestampFormat: "2006-01-02 15:04:05",})client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650",// client 初始化时传入 logrus logger// 如果用户在业务代码中已经初始化自定义 logrus logger,则在这里传入即可Logger: pulsarlog.NewLoggerWithLogrus(logger),})if err != nil {logger.Error("create client err", "error", err)return}defer client.Close()
{"level":"info","msg":"Published message, msgId = 145:23:0","time":"2025-12-11 21:51:03"}{"level":"info","msg":"Published message, msgId = 145:24:0","time":"2025-12-11 21:51:03"}{"level":"info","msg":"Published message, msgId = 145:25:0","time":"2025-12-11 21:51:03"}{"level":"info","msg":"Published message, msgId = 145:26:0","time":"2025-12-11 21:51:03"}{"level":"info","msg":"Published message, msgId = 145:27:0","time":"2025-12-11 21:51:03"}{"level":"info","msg":"Published message, msgId = 145:28:0","time":"2025-12-11 21:51:03"}{"level":"info","msg":"Published message, msgId = 145:29:0","time":"2025-12-11 21:51:03"}{"level":"info","msg":"Closing producer","producerID":1,"producer_name":"standalone-0-56","time":"2025-12-11 21:51:03","topic":"persistent://public/default/topic-1"}{"level":"info","msg":"Producer is shutting down. Close the reconnect event loop","producerID":1,"producer_name":"standalone-0-56","time":"2025-12-11 21:51:03","topic":"persistent://public/default/topic-1"}{"level":"info","msg":"Closed producer","producerID":1,"producer_name":"standalone-0-56","time":"2025-12-11 21:51:03","topic":"persistent://public/default/topic-1"}
// It's recommended to make this a global instance called `log`.func New() *Logger {return &Logger{Out: os.Stderr, // 默认输出Formatter: new(TextFormatter),Hooks: make(LevelHooks),Level: InfoLevel,ExitFunc: os.Exit,ReportCaller: false,}}
os.Stderr,如果用户没有自定义日志 lib 的话,Go SDK 的日志就会和业务日志混淆到一起,增加了问题定位的难度。// ClientOptions is used to construct a Pulsar Client instance.type ClientOptions struct {// Configure the logger used by the client.// By default, a wrapped logrus.StandardLogger will be used, namely,// log.NewLoggerWithLogrus(logrus.StandardLogger())// FIXME: use `logger` as internal field name instead of `log` as it's more idiomaticLogger log.Logger}
logger 接口的形式,自定义 log lib,来达到将日志重定向到指定位置的目的。下面以 logrus 为例,自定义一个 log lib,将 Go SDK 的日志输出到指定文件:package mainimport ("fmt""io""os""github.com/apache/pulsar-client-go/pulsar/log""github.com/sirupsen/logrus")// logrusWrapper implements Logger interface// based on underlying logrus.FieldLoggertype logrusWrapper struct {l logrus.FieldLogger}// NewLoggerWithLogrus creates a new logger which wraps// the given logrus.Loggerfunc NewLoggerWithLogrus(logger *logrus.Logger, outputPath string) log.Logger {writer1 := os.Stdoutwriter2, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE, 0755)if err != nil {logrus.Error("create file log.txt failed: %v", err)}logger.SetOutput(io.MultiWriter(writer1, writer2))return &logrusWrapper{l: logger,}}func (l *logrusWrapper) SubLogger(fs log.Fields) log.Logger {return &logrusWrapper{l: l.l.WithFields(logrus.Fields(fs)),}}func (l *logrusWrapper) WithFields(fs log.Fields) log.Entry {return logrusEntry{e: l.l.WithFields(logrus.Fields(fs)),}}func (l *logrusWrapper) WithField(name string, value interface{}) log.Entry {return logrusEntry{e: l.l.WithField(name, value),}}func (l *logrusWrapper) WithError(err error) log.Entry {return logrusEntry{e: l.l.WithError(err),}}func (l *logrusWrapper) Debug(args ...interface{}) {l.l.Debug(args...)}func (l *logrusWrapper) Info(args ...interface{}) {l.l.Info(args...)}func (l *logrusWrapper) Warn(args ...interface{}) {l.l.Warn(args...)}func (l *logrusWrapper) Error(args ...interface{}) {l.l.Error(args...)}func (l *logrusWrapper) Debugf(format string, args ...interface{}) {l.l.Debugf(format, args...)}func (l *logrusWrapper) Infof(format string, args ...interface{}) {l.l.Infof(format, args...)}func (l *logrusWrapper) Warnf(format string, args ...interface{}) {l.l.Warnf(format, args...)}func (l *logrusWrapper) Errorf(format string, args ...interface{}) {l.l.Errorf(format, args...)}type logrusEntry struct {e logrus.FieldLogger}func (l logrusEntry) WithFields(fs log.Fields) log.Entry {return logrusEntry{e: l.e.WithFields(logrus.Fields(fs)),}}func (l logrusEntry) WithField(name string, value interface{}) log.Entry {return logrusEntry{e: l.e.WithField(name, value),}}func (l logrusEntry) Debug(args ...interface{}) {l.e.Debug(args...)}func (l logrusEntry) Info(args ...interface{}) {l.e.Info(args...)}func (l logrusEntry) Warn(args ...interface{}) {l.e.Warn(args...)}func (l logrusEntry) Error(args ...interface{}) {l.e.Error(args...)}func (l logrusEntry) Debugf(format string, args ...interface{}) {l.e.Debugf(format, args...)}func (l logrusEntry) Infof(format string, args ...interface{}) {l.e.Infof(format, args...)}func (l logrusEntry) Warnf(format string, args ...interface{}) {l.e.Warnf(format, args...)}func (l logrusEntry) Errorf(format string, args ...interface{}) {l.e.Errorf(format, args...)}
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650",Logger: NewLoggerWithLogrus(log.StandardLogger(), "test.log"),})
文档反馈