go get github.com/apache/rocketmq-client-go/v2
syncSendMessage.go file.// Service access address (Note: Add “http://” or “https://” before the access address; otherwise, it cannot be resolved)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// Authorize the role namevar secretKey = "admin"// Authorize the role tokenvar accessKey = "eyJrZXlJZC...."// Full namespace namevar nameSpace = "MQ_INST_rocketmqem4xxxx"// Producer group namevar groupName = "group1"// Create a message producerp, _ := rocketmq.NewProducer(// Set the service addressproducer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set ACL permissionsproducer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the producer groupproducer.WithGroupName(groupName),// Set the namespace nameproducer.WithNamespace(nameSpace),// Set the number of retries upon sending failuresproducer.WithRetry(2),)// Start the producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}
// Topic namevar topicName = "topic1"// Producer group namevar groupName = "group1"// Create a message producerp, _ := rocketmq.NewProducer(// Set the service addressproducer.WithNsResolver(primitive.NewPassthroughResolver([]string{"https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"})),// Set ACL permissionsproducer.WithCredentials(primitive.Credentials{SecretKey: "admin",AccessKey: "eyJrZXlJZC......",}),// Set the producer groupproducer.WithGroupName(groupName),// Set the namespace nameproducer.WithNamespace("rocketmq-xxx|namespace_go"),// Set the number of retries upon sending failuresproducer.WithRetry(2),)// Start the producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}for i := 0; i < 1; i++ {msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))// Set delay level// The relationship between the delay level and the delay time:// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h;// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// If you want to use the delay level, then set the following method:msg.WithDelayTimeLevel(3)// If you want to use any delayed message, then set the following method without setting `WithDelayTimeLevel`. The unit is milliseconds. The following shows that a delayed message is delivered after 10 seconds.delayMills := int64(10 * 1000)msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))// Send the messageres, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}}// Release resourceserr = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
Parameter | Description |
secretKey | Role name, copy from the SecretKey column on the Role Management page in the console. |
accessKey | Role token, copy from the AccessKey column on the Role Management page in the console. ![]() |
nameSpace | Namespace name, which can be copied on the Namespace page in the console. ![]() |
serverAddress | Obtain the cluster access address from the access information module on the console cluster basic information page. (Note: add http:// or https:// before the access address, otherwise it cannot be parsed). ![]() |
groupName | Copy the producer group name from the Group Management page on the console. ![]() |
// Topic namevar topicName = "topic1"// Configure message contentmsg := &primitive.Message{Topic: topicName, // Set the topic nameBody: []byte("Hello RocketMQ Go Client! This is a new message."),}// Set tagsmsg.WithTag("TAG")// Set keysmsg.WithKeys([]string{"yourKey"})// Send the messageres, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}
Parameter | Description |
topicName | Topic name, which can be copied under the Topic tab on the cluster details page in the console. ![]() |
TAG | Message tag identifier |
yourKey | Message business key |
// Disable the producererr = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
// Service access address (Note: Add “http://” or “https://” before the access address; otherwise, it cannot be resolved)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// Authorize the role namevar secretKey = "admin"// Authorize the role tokenvar accessKey = "eyJrZXlJZC...."// Full namespace namevar nameSpace = "rocketmq-xxx|namespace_go"// Producer group namevar groupName = "group11"// Create a consumerc, err := rocketmq.NewPushConsumer(// Set the consumer groupconsumer.WithGroupName(groupName),// Set the service addressconsumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set ACL permissionsconsumer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the namespace nameconsumer.WithNamespace(nameSpace),// Set consumption from the start offsetconsumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),// Set the consumption mode (cluster consumption by default)consumer.WithConsumerModel(consumer.Clustering),//For broadcasting consumption, set the instance name to the system name of the application. If the instance name is not set, the pid will be used, which will cause a restart for repeated consumptionconsumer.WithInstance("xxxx"),)if err != nil {fmt.Println("init consumer2 error: " + err.Error())os.Exit(0)}
Parameter | Description |
secretKey | Role name, copy from the SecretKey column on the Role Management page in the console. |
accessKey | Role token, copy from the AccessKey column on the Role Management page in the console. ![]() |
nameSpace | Copy the namespace name from the Namespace page in the console. If you use a 4.x generic cluster or 5.x cluster, fill in the Cluster ID here. ![]() |
serverAddress | Obtain the cluster access address from the access information module on the console cluster basic information page. (Note: add http:// or https:// before the access address, otherwise it cannot be parsed). ![]() |
groupName | Copy the producer group name from the Group Management page on the console. ![]() |
// Topic namevar topicName = "topic1"// Set the tag of messages that are subscribed toselector := consumer.MessageSelector{Type: consumer.TAG,Expression: "TagA || TagC",}// Set the delay level of consumption retry. A total of 18 levels can be set. Below is the relationship between each delay level and the delay time.// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2hdelayLevel := 1err = c.Subscribe(topicName, selector, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {fmt.Printf("subscribe callback len: %d \\n", len(msgs))// Set the delay level for the next consumptionconcurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLaterfor _, msg := range msgs {// Simulate a successful consumption after three retriesif msg.ReconsumeTimes > 3 {fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)return consumer.ConsumeSuccess, nil} else {fmt.Printf("subscribe callback: %v \\n", msg)}}// Simulate a consumption failure. Retry is required.return consumer.ConsumeRetryLater, nil})if err != nil {fmt.Println(err.Error())}
Parameter | Description |
topicName | Topic name, which can be copied on the Topic page in the console. |
Expression | Message tag identifier |
delayLevel | A parameter used to set the delay level of consumption retry. A total of 18 delay levels are supported. |
// Start consumptionerr = c.Start()if err != nil {fmt.Println(err.Error())os.Exit(-1)}time.Sleep(time.Hour)// Release resourceserr = c.Shutdown()if err != nil {fmt.Printf("shundown Consumer error: %s", err.Error())}

フィードバック