pulsar-client-go library in the client environment.go get -u "github.com/apache/pulsar-client-go/pulsar"
import "github.com/apache/pulsar-client-go/pulsar"
// Create a Pulsar client.client, err := pulsar.NewClient(pulsar.ClientOptions{// The address used to access the service.URL: serviceUrl,// The authorized role token.Authentication: pulsar.NewAuthenticationToken(authentication),OperationTimeout: 30 * time.Second,ConnectionTimeout: 30 * time.Second,})if err != nil {log.Fatalf("Could not instantiate Pulsar client: %v", err)}defer client.Close()
Parameter | Description |
serviceUrl | Address used to access the cluster. You can view and copy the address from the Cluster page in the console. |
Authentication |
// Create a producer using the client.producer, err := client.CreateProducer(pulsar.ProducerOptions{// The full topic path in the format of persistent://cluster (tenant) ID/namespace/topic.Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",})if err != nil {log.Fatal(err)}defer producer.Close()
persistent://clusterid/namespace/topic. You can copy the clusterid/namespace/topic part from the Topic page in the console.
// Send a message._, err = producer.Send(context.Background(), &pulsar.ProducerMessage{// The message content.Payload: []byte("hello go client, this is a message."),// The business key.Key: "yourKey",// The business parameters.Properties: map[string]string{"key": "value"},})
// Create a consumer using the client.consumer, err := client.Subscribe(pulsar.ConsumerOptions{// The full topic path in the format of persistent://cluster (tenant) ID/namespace/topic.Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",// The subscription name.SubscriptionName: "topic1_sub",// The subscription mode.Type: pulsar.Shared,})if err != nil {log.Fatal(err)}defer consumer.Close()
persistent://clusterid/namespace/topic. You can copy the clusterid/namespace/topic part from the Topic page in the console.
// Obtain messages.msg, err := consumer.Receive(context.Background())if err != nil {log.Fatal(err)}// Simulate business processing.fmt.Printf("Received message msgId: %#v -- content: '%s'\\n",msg.ID(), string(msg.Payload()))// If the consumption succeeds, reply ack. If the consumption fails, reply nack or ReconsumeLater based on your business needs.consumer.Ack(msg)

// It's recommended to make this a global instance called `log`.func New() *Logger {return &Logger{Out: os.Stderr, // The default output.Formatter: new(TextFormatter),Hooks: make(LevelHooks),Level: InfoLevel,ExitFunc: os.Exit,ReportCaller: false,}}
os.Stderr by default. If you do not specify a custom logging library, the Go SDK logs and business logs will be mixed, making it difficult for troubleshooting.// ClientOptions is used to construct a Pulsar Client instance.type ClientOptions struct {// Configure the logger used by the client.// By default, a wrapped logrus.StandardLogger will be used, namely,// log.NewLoggerWithLogrus(logrus.StandardLogger())// FIXME: use `logger` as internal field name instead of `log` as it's more idiomaticLogger log.Logger}
logger API to specify a custom logging library so that you can redirect logs to a specified location. Taking logrus as an example, the demo below shows how to specify a custom logging library to output the Go SDK logs to a specified file.package mainimport ("fmt""io""os""github.com/apache/pulsar-client-go/pulsar/log""github.com/sirupsen/logrus")// logrusWrapper implements Logger interface// based on underlying logrus.FieldLoggertype logrusWrapper struct {l logrus.FieldLogger}// NewLoggerWithLogrus creates a new logger which wraps// the given logrus.Loggerfunc NewLoggerWithLogrus(logger *logrus.Logger, outputPath string) log.Logger {writer1 := os.Stdoutwriter2, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE, 0755)if err != nil {logrus.Error("create file log.txt failed: %v", err)}logger.SetOutput(io.MultiWriter(writer1, writer2))return &logrusWrapper{l: logger,}}func (l *logrusWrapper) SubLogger(fs log.Fields) log.Logger {return &logrusWrapper{l: l.l.WithFields(logrus.Fields(fs)),}}func (l *logrusWrapper) WithFields(fs log.Fields) log.Entry {return logrusEntry{e: l.l.WithFields(logrus.Fields(fs)),}}func (l *logrusWrapper) WithField(name string, value interface{}) log.Entry {return logrusEntry{e: l.l.WithField(name, value),}}func (l *logrusWrapper) WithError(err error) log.Entry {return logrusEntry{e: l.l.WithError(err),}}func (l *logrusWrapper) Debug(args ...interface{}) {l.l.Debug(args...)}func (l *logrusWrapper) Info(args ...interface{}) {l.l.Info(args...)}func (l *logrusWrapper) Warn(args ...interface{}) {l.l.Warn(args...)}func (l *logrusWrapper) Error(args ...interface{}) {l.l.Error(args...)}func (l *logrusWrapper) Debugf(format string, args ...interface{}) {l.l.Debugf(format, args...)}func (l *logrusWrapper) Infof(format string, args ...interface{}) {l.l.Infof(format, args...)}func (l *logrusWrapper) Warnf(format string, args ...interface{}) {l.l.Warnf(format, args...)}func (l *logrusWrapper) Errorf(format string, args ...interface{}) {l.l.Errorf(format, args...)}type logrusEntry struct {e logrus.FieldLogger}func (l logrusEntry) WithFields(fs log.Fields) log.Entry {return logrusEntry{e: l.e.WithFields(logrus.Fields(fs)),}}func (l logrusEntry) WithField(name string, value interface{}) log.Entry {return logrusEntry{e: l.e.WithField(name, value),}}func (l logrusEntry) Debug(args ...interface{}) {l.e.Debug(args...)}func (l logrusEntry) Info(args ...interface{}) {l.e.Info(args...)}func (l logrusEntry) Warn(args ...interface{}) {l.e.Warn(args...)}func (l logrusEntry) Error(args ...interface{}) {l.e.Error(args...)}func (l logrusEntry) Debugf(format string, args ...interface{}) {l.e.Debugf(format, args...)}func (l logrusEntry) Infof(format string, args ...interface{}) {l.e.Infof(format, args...)}func (l logrusEntry) Warnf(format string, args ...interface{}) {l.e.Warnf(format, args...)}func (l logrusEntry) Errorf(format string, args ...interface{}) {l.e.Errorf(format, args...)}
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650",Logger: NewLoggerWithLogrus(log.StandardLogger(), "test.log"),})
Feedback