Release Notes
Announcements
go get github.com/apache/rocketmq-client-go/v2
syncSendMessage.go file.// Service access address. (Note: You need to add http:// or https:// before the access address; otherwise, it cannot be resolved.)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// Authorized role name.var secretKey = "admin"// Authorized role token.var accessKey = "eyJrZXlJZC...."// Full namespace name.var nameSpace = "MQ_INST_rocketmqem4xxxx"// Producer group name.var groupName = "group1"// Create a message producer.p, _ := rocketmq.NewProducer(// Set the service address.producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set the ACL permissions.producer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the producer group.producer.WithGroupName(groupName),// Set the namespace name.producer.WithNamespace(nameSpace),// Set the number of retries when the sending fails.producer.WithRetry(2),)// Start the producer.err := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}
// Topic name.var topicName = "topic1"// Producer group name.var groupName = "group1"// Create a message producer.p, _ := rocketmq.NewProducer(// Set the service address.producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"})),// Set the ACL permissions.producer.WithCredentials(primitive.Credentials{SecretKey: "admin",AccessKey: "eyJrZXlJZC......",}),// Set the producer group.producer.WithGroupName(groupName),// Set the namespace name.producer.WithNamespace("rocketmq-xxx|namespace_go"),// Set the number of retries when the sending fails.producer.WithRetry(2),)// Start the producer.err := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}for i := 0; i < 1; i++ {msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))// Set the delay level.// Level and time correspondence:// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, and 2h;// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// To use the delay level, set the following method.msg.WithDelayTimeLevel(3)// To use any delayed message, set the following method and do not set WithDelayTimeLevel. Delay is expressed in milliseconds. In the following case, delivery is implemented in 10s.delayMills := int64(10 * 1000)msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().UnixMilli()+delayMills, 10))// Send messages.res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}}// Release resources.err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
Parameter | Description |
secretKey | Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console. |
accessKey | Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console. |
nameSpace | Namespace name. You can copy the name from the Namespace page in the console. If you are using a 4.x general cluster or a 5.x cluster, leave this parameter blank. |
serverAddress | Cluster access address. You can obtain the access address from the Access Information module on the cluster basic information page in the console. (Note: You need to add http:// or https:// before the access address; otherwise, it cannot be resolved.) |
groupName | Producer group name. You can copy the name from the Group Management page in the console. |
// Topic name.var topicName = "topic1"// Construct the message content.msg := &primitive.Message{Topic: topicName, // Set the topic name.Body: []byte("Hello RocketMQ Go Client! This is a new message."),}// Set the tag.msg.WithTag("TAG")// Set the key.msg.WithKeys([]string{"yourKey"})// Send messages.res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}
Parameter | Description |
topicName | Topic name. You can copy the name from the Topic Management page in the console. |
TAG | Message tag identifier. |
yourKey | Message business key. |
// Close the producer.err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
// Service access address. (Note: You need to add http:// or https:// before the access address; otherwise, it cannot be resolved.)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// Authorized role name.var secretKey = "admin"// Authorized role token.var accessKey = "eyJrZXlJZC...."// Full namespace name.var nameSpace = "rocketmq-xxx|namespace_go"// Producer group name.var groupName = "group11"// Create a consumer.c, err := rocketmq.NewPushConsumer(// Set the consumer group.consumer.WithGroupName(groupName),// Set the service address.consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set the ACL permissions.consumer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the namespace name.consumer.WithNamespace(nameSpace),// Set consumption from the starting position.consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),// Set the consumption mode (clustering mode by default).consumer.WithConsumerModel(consumer.Clustering),//For broadcasting consumption, set the instance name. Set it to the system name of the application; otherwise, pid is used, which causes repeated consumption after a restart.consumer.WithInstance("xxxx"),)if err != nil {fmt.Println("init consumer2 error: " + err.Error())os.Exit(0)}
Parameter | Description |
secretKey | Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console. |
accessKey | Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console. |
nameSpace | Namespace name. You can copy the name from the Namespace page in the console. If you are using a 4.x general cluster or a 5.x cluster, specify the cluster ID for this parameter. |
serverAddress | Cluster access address. You can obtain the access address from the Access Information module on the cluster basic information page in the console. (Note: You need to add http:// or https:// before the access address; otherwise, it cannot be resolved.) |
groupName | Producer group name. You can copy the name from the Group Management page in the console. |
// Topic name.var topicName = "topic1"// Set the tag for subscribed messages.selector := consumer.MessageSelector{Type: consumer.TAG,Expression: "TagA || TagC",}// Set the reconsumption delay level. 18 delay levels are supported. Below is the mapping between delay levels and delay times.// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2hdelayLevel := 1err = c.Subscribe(topicName, selector, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {fmt.Printf("subscribe callback len: %d \\n", len(msgs))// Set the delay level for the next consumption.concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLaterfor _, msg := range msgs {// Simulate successful consumption after 3 retries.if msg.ReconsumeTimes > 3 {fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)return consumer.ConsumeSuccess, nil} else {fmt.Printf("subscribe callback: %v \\n", msg)}}// Simulate consumption failure and return consumption retry to the server.return consumer.ConsumeRetryLater, nil})if err != nil {fmt.Println(err.Error())}
Parameter | Description |
topicName | Topic name. You can copy the name from the Topic Management page in the console. |
Expression | Message tag identifier. |
delayLevel | Set the reconsumption delay level. 18 delay levels are supported. |
// Start consumption.err = c.Start()if err != nil {fmt.Println(err.Error())os.Exit(-1)}time.Sleep(time.Hour)// Release resources.err = c.Shutdown()if err != nil {fmt.Printf("shundown Consumer error: %s", err.Error())}
Was this page helpful?
You can also Contact sales or Submit a Ticket for help.
Help us improve! Rate your documentation experience in 5 mins.
Feedback