tencent cloud

Last updated:2025-12-24 15:07:56
Go SDK
Last updated: 2025-12-24 15:07:56

Scenarios

This document describes how to use an open-source SDK (taking the Go SDK as an example) to send and receive messages, so as to help you better understand the complete process of sending and receiving messages.

Prerequisites

You have obtained the client connection parameters as instructed in SDK Overview.
You have installed Go.
Note:
It is recommended to use version 0.13.1 and later.

Operation Steps

1. Import the pulsar-client-go library in the client environment.
1.1 Execute the following command in the client environment to download dependencies related to the Pulsar client.
go get -u "github.com/apache/pulsar-client-go/pulsar"
1.2 After the installation is complete, you can quote it in your Go project file through the following code.
import "github.com/apache/pulsar-client-go/pulsar"
2. Create a Pulsar client.
// Create a Pulsar client.
client, err := pulsar.NewClient(pulsar.ClientOptions{
// The address used to access the service.
URL: serviceUrl,
// The authorized role token.
Authentication: pulsar.NewAuthenticationToken(authentication),
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})

if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}

defer client.Close()

Parameter
Description
serviceUrl
Address used to access the cluster. You can view and copy the address from the Cluster page in the console.
Authentication
Role token. You can copy the token from the Role Management page.
3. Create a producer.
// Create a producer using the client.
producer, err := client.CreateProducer(pulsar.ProducerOptions{
// The full topic path in the format of persistent://cluster (tenant) ID/namespace/topic.
Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
})

if err != nil {
log.Fatal(err)
}
defer producer.Close()
Note
You need to provide the full path of the topic in the format of persistent://clusterid/namespace/topic. You can copy the clusterid/namespace/topic part from the Topic page in the console.
4. Send a message.
// Send a message.
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// The message content.
Payload: []byte("hello go client, this is a message."),
// The business key.
Key: "yourKey",
// The business parameters.
Properties: map[string]string{"key": "value"},
})
5. Create a consumer.
// Create a consumer using the client.
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// The full topic path in the format of persistent://cluster (tenant) ID/namespace/topic.
Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
// The subscription name.
SubscriptionName: "topic1_sub",
// The subscription mode.
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
Note
You need to provide the full path of the topic in the format of persistent://clusterid/namespace/topic. You can copy the clusterid/namespace/topic part from the Topic page in the console. 
You need to enter the subscription name. You can view the subscription name on the Consumer tab of the topic.
6. Consume messages.
// Obtain messages.
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
// Simulate business processing.
fmt.Printf("Received message msgId: %#v -- content: '%s'\\n",
msg.ID(), string(msg.Payload()))

// If the consumption succeeds, reply ack. If the consumption fails, reply nack or ReconsumeLater based on your business needs.
consumer.Ack(msg)
7. Log in to the TDMQ for Apache Pulsar console. On the Topic page, click the topic name to go to the Consumption Management page. Click the dropdown arrow next to the subscription name to view production and consumption records.
img


Note
The above is a brief introduction to the publishing and subscription of messages. For more information, see Demo or Pulsar Official Documentation.

Customizing Log File Output

Scenarios

As many users do not customize the logging library when using the Pulsar Go SDK, logs are output to os.Stderr by default, as shown below:
// It's recommended to make this a global instance called `log`.
func New() *Logger {
return &Logger{
Out: os.Stderr, // The default output.
Formatter: new(TextFormatter),
Hooks: make(LevelHooks),
Level: InfoLevel,
ExitFunc: os.Exit,
ReportCaller: false,
}
}
Generally, log information is output to os.Stderr by default. If you do not specify a custom logging library, the Go SDK logs and business logs will be mixed, making it difficult for troubleshooting.

Solution

With the logger API exposed on the client by the Go SDK, you can customize the log output format and location and use logging libraries such as logrus and zap. Related parameters are as follows:
1. Implement the log.Logger API provided by the Pulsar Go SDK by using a custom logging library:
// ClientOptions is used to construct a Pulsar Client instance.
type ClientOptions struct {
// Configure the logger used by the client.
// By default, a wrapped logrus.StandardLogger will be used, namely,
// log.NewLoggerWithLogrus(logrus.StandardLogger())
// FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic
Logger log.Logger
}
When using the Go SDK, you can customize the logger API to specify a custom logging library so that you can redirect logs to a specified location. Taking logrus as an example, the demo below shows how to specify a custom logging library to output the Go SDK logs to a specified file.
package main

import (
"fmt"
"io"
"os"

"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/sirupsen/logrus"

)

// logrusWrapper implements Logger interface
// based on underlying logrus.FieldLogger
type logrusWrapper struct {
l logrus.FieldLogger
}

// NewLoggerWithLogrus creates a new logger which wraps
// the given logrus.Logger
func NewLoggerWithLogrus(logger *logrus.Logger, outputPath string) log.Logger {
writer1 := os.Stdout
writer2, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
logrus.Error("create file log.txt failed: %v", err)
}
logger.SetOutput(io.MultiWriter(writer1, writer2))
return &logrusWrapper{
l: logger,
}
}

func (l *logrusWrapper) SubLogger(fs log.Fields) log.Logger {
return &logrusWrapper{
l: l.l.WithFields(logrus.Fields(fs)),
}
}

func (l *logrusWrapper) WithFields(fs log.Fields) log.Entry {
return logrusEntry{
e: l.l.WithFields(logrus.Fields(fs)),
}
}

func (l *logrusWrapper) WithField(name string, value interface{}) log.Entry {
return logrusEntry{
e: l.l.WithField(name, value),
}
}

func (l *logrusWrapper) WithError(err error) log.Entry {
return logrusEntry{
e: l.l.WithError(err),
}
}

func (l *logrusWrapper) Debug(args ...interface{}) {
l.l.Debug(args...)
}

func (l *logrusWrapper) Info(args ...interface{}) {
l.l.Info(args...)
}

func (l *logrusWrapper) Warn(args ...interface{}) {
l.l.Warn(args...)
}

func (l *logrusWrapper) Error(args ...interface{}) {
l.l.Error(args...)
}

func (l *logrusWrapper) Debugf(format string, args ...interface{}) {
l.l.Debugf(format, args...)
}

func (l *logrusWrapper) Infof(format string, args ...interface{}) {
l.l.Infof(format, args...)
}

func (l *logrusWrapper) Warnf(format string, args ...interface{}) {
l.l.Warnf(format, args...)
}

func (l *logrusWrapper) Errorf(format string, args ...interface{}) {
l.l.Errorf(format, args...)
}

type logrusEntry struct {
e logrus.FieldLogger
}

func (l logrusEntry) WithFields(fs log.Fields) log.Entry {
return logrusEntry{
e: l.e.WithFields(logrus.Fields(fs)),
}
}

func (l logrusEntry) WithField(name string, value interface{}) log.Entry {
return logrusEntry{
e: l.e.WithField(name, value),
}
}

func (l logrusEntry) Debug(args ...interface{}) {
l.e.Debug(args...)
}

func (l logrusEntry) Info(args ...interface{}) {
l.e.Info(args...)
}

func (l logrusEntry) Warn(args ...interface{}) {
l.e.Warn(args...)
}

func (l logrusEntry) Error(args ...interface{}) {
l.e.Error(args...)
}

func (l logrusEntry) Debugf(format string, args ...interface{}) {
l.e.Debugf(format, args...)
}

func (l logrusEntry) Infof(format string, args ...interface{}) {
l.e.Infof(format, args...)
}

func (l logrusEntry) Warnf(format string, args ...interface{}) {
l.e.Warnf(format, args...)
}

func (l logrusEntry) Errorf(format string, args ...interface{}) {
l.e.Errorf(format, args...)
}
2. Specify a custom logging library when creating the client.
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
Logger: NewLoggerWithLogrus(log.StandardLogger(), "test.log"),
})
The above demo shows how to redirect the log file of the Pulsar Go SDK to the test.log file in the current directory. You can redirect the log file to a specified location as needed.

Notes Related to SDK Versions

Community Issues and Optimizations

For the Go SDK versions earlier than v0.9.0, you need to upgrade the version to v0.13.1 before you upgrade the broker. For versions v0.9 and later, if no issues are encountered, the client does not need to be upgraded.
The following critical issues have been fixed in higher versions:
1. Fixed the reconnection blocking issue, which may cause sending timeouts (Reference Documentation).
2. Fixed the connection leak issue, which may cause a surge in consumers (Reference Documentation).
3. Cluster changes or node failures may cause consumption blocking, which can be resolved by unloading or restarting consumers (Reference Documentation).
4. Sending timeouts may be triggered during reconnection (Reference Documentation).
5. Fixed the issue where availablePermits leak causes consumption blocking when messages are discarded (Reference Documentation).
6. Fixed the issue where availablePermitsCh is blocked under specific exception conditions (Reference Documentation).
7. Fixed the issue of synchronous send blocking under specific exception conditions (Reference Documentation).
8. Added support for group acknowledgment, significantly improving client consumption performance (Reference Documentation).
9. Fixed the issue of backoff time failure, where retries were always performed with a 100 ms backoff (Reference Documentation).
10. Fixed the issue where message keys failed to be set in non-batch mode (Reference Documentation).
11. Fixed the issue where all messages in the pending queue were marked as failed upon sending failure (Reference Documentation).
12. Fixed the bugs that caused a large number of duplicate messages, specifically those occurring when delayed messages exceeded server limits or after client disconnection and reconnection (Reference Documentation).
For a complete list of community issues, see Reference Documentation.

Risks in Earlier Versions

For the Go SDK versions earlier than v0.9.0 (excluding v0.9.0), the exception handling in extreme scenarios is not comprehensive enough. In situations such as broker upgrades, restarts, or network failures, there is a very small probability of exceptions during the client and server reconnection process, leading to issues such as sending timeouts or consumption halts. It is strongly recommended to first upgrade the client to v0.13.1 before updating the broker cluster version.

Handling Risks in Earlier Versions

Higher-version clients can reconnect normally during broker upgrades, ensuring minimal impact on business operations. However, if your client SDK cannot be upgraded to the new version, it is recommended that you monitor the client's log output and production/consumption metrics in the console after upgrading the broker cluster.
If production or consumption becomes stuck, restart the client promptly. Theoretically, the client should resume normal operation after restarting. If the issue persists after you restart the client, submit a ticket promptly for further troubleshooting.




Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback