go get github.com/apache/rocketmq-client-go/v2
// Service access address. (Note: Add "http://" or "https://" before the access address; otherwise, it cannot be resolved.)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// Authorized role namevar secretKey = "admin"// Authorized role tokenvar accessKey = "eyJrZXlJZC...."// Full namespace namevar nameSpace = "rocketmq-xxx|namespace_go"// Producer group namevar groupName = "group1"// Create a message producerp, _ := rocketmq.NewProducer(// Set the service addressproducer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set ACL permissionsproducer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the producer groupproducer.WithGroupName(groupName),// Set the namespace nameproducer.WithNamespace(nameSpace),// Set the number of retries upon sending failuresproducer.WithRetry(2),)// Start the producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}
Parameter | Description |
secretKey | |
accessKey | |
nameSpace | Full namespace name, which can be copied under the Topic tab on the Cluster page in the console and is in the format of cluster ID + | + namespace. |
serverAddress | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. (Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved.) |
groupName | Producer group name, which can be copied under the Group tab in the console. |
// Topic namevar topicName = "topic1"// Construct the message contentmsg := &primitive.Message{Topic: topicName, // Set the topic nameBody: []byte("Hello RocketMQ Go Client! This is a new message."),}// Set the tagmsg.WithTag("TAG")// Set the keymsg.WithKeys([]string{"yourKey"})// Send the messageres, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}
Parameter | Description |
topicName | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
TAG | Message tag. |
yourKey | Message key. |
// Close the producererr = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
// Service access address. (Note: Add "http://" or "https://" before the access address; otherwise, it cannot be resolved.)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// Authorized role namevar secretKey = "admin"// Authorized role tokenvar accessKey = "eyJrZXlJZC...."// Full namespace namevar nameSpace = "rocketmq-xxx|namespace_go"// Producer group namevar groupName = "group11"// Create a consumerc, err := rocketmq.NewPushConsumer(// Set the consumer groupconsumer.WithGroupName(groupName),// Set the service addressconsumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set ACL permissionsconsumer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the namespace nameconsumer.WithNamespace(nameSpace),// Set consumption from the start offsetconsumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),// Set the consumption mode (cluster consumption by default)consumer.WithConsumerModel(consumer.Clustering),)if err != nil {fmt.Println("init consumer2 error: " + err.Error())os.Exit(0)}
Parameter | Description |
secretKey | |
accessKey | |
nameSpace | Full namespace name, which can be copied under the Topic tab on the Cluster page in the console and is in the format of cluster ID + | + namespace. |
serverAddress | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. (Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved.) |
groupName | Producer group name, which can be copied under the Group tab in the console. |
// Topic namevar topicName = "topic1"// Set the tag of messages that are subscribed toselector := consumer.MessageSelector{Type: consumer.TAG,Expression: "TagA || TagC",}// Set the delay level of consumption retry. A total of 18 levels can be set. Below is the relationship between each delay level and the delay time.// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2hdelayLevel := 1err = c.Subscribe(topicName, selector, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {fmt.Printf("subscribe callback len: %d \\n", len(msgs))// Set the delay level for the next consumptionconcurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLaterfor _, msg := range msgs {// Simulate a successful consumption after three retriesif msg.ReconsumeTimes > 3 {fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)return consumer.ConsumeSuccess, nil} else {fmt.Printf("subscribe callback: %v \\n", msg)}}// Simulate a consumption failure. Retry is required.return consumer.ConsumeRetryLater, nil})if err != nil {fmt.Println(err.Error())}
Parameter | Description |
topicName | Topic name, which can be copied on the Topic page in the console. |
Expression | Message tag. |
delayLevel | A parameter used to set the delay level of consumption retry. A total of 18 delay levels are supported. |
// Start consumptionerr = c.Start()if err != nil {fmt.Println(err.Error())os.Exit(-1)}time.Sleep(time.Hour)// Release the resourceserr = c.Shutdown()if err != nil {fmt.Printf("shundown Consumer error: %s", err.Error())}
Was this page helpful?