tencent cloud

Feedback

Use of Go SDK

Last updated: 2024-01-17 16:56:46

    Overview

    This document describes how to use an open-source SDK to send and receive messages with the Golang SDK serving as example, for you to better understand the complete process of message sending and receiving.

    Prerequisites

    Directions:

    1. Execute the following command in the client environment to download the relevant RocketMQ client dependencies.
    go get github.com/apache/rocketmq-client-go/v2
    2. Create a producer in the corresponding method. If you need to send standard messages, modify the corresponding parameters in the syncSendMessage.go file.
    Currently, delayed messages support arbitrary precision delay, unaffected by the delay level.
    General message
    Delayed Messages
    // Service access address (Note: http:// or https:// must be appended before the access address. Otherwise, it cannot be parsed.)
    var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"
    // Authorize the role name
    var secretKey = "admin"
    // Authorize the key for the role
    var accessKey = "eyJrZXlJZC...."
    // Producer group name
    var groupName = "group1"
    // Create a message producer
    p, _ := rocketmq.NewProducer(
    // Set the service address
    producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
    // Set ACL permissions
    producer.WithCredentials(primitive.Credentials{
    SecretKey: secretKey,
    AccessKey: accessKey,
    }),
    // Set the producer group
    producer.WithGroupName(groupName),
    
    // Set the number of retries upon sending failures
    producer.WithRetry(2),
    )
    // Start the producer
    err := p.Start()
    if err != nil {
    fmt.Printf("start producer error: %s", err.Error())
    os.Exit(1)
    }
    // Topic name
    var topicName = "topic1"
    // Producer group name
    var groupName = "group1"
    // Create a message producer
    p, _ := rocketmq.NewProducer(
    // Set the service address
    producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"})),
    // Set ACL permissions
    producer.WithCredentials(primitive.Credentials{
    SecretKey: "admin",
    AccessKey: "eyJrZXlJZC......",
    }),
    // Set the producer group
    producer.WithGroupName(groupName),
    
    // Set the number of retries upon sending failures
    producer.WithRetry(2),
    )
    // Start the producer
    err := p.Start()
    if err != nil {
    fmt.Printf("start producer error: %s", err.Error())
    os.Exit(1)
    }
    
    for i := 0; i < 1; i++ {
    msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))
    // Specify the delay level
    // Relationship between level and time:
    // 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h;
    // 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
    // If you want to use the delay level, set the following method
    
    msg.WithDelayTimeLevel(3)
    // If you want to use arbitrary delay messages, set the following method and leave WithDelayTimeLevel unconfigured. The unit is in specific milliseconds. The following settings indicate that the delivery takes place after 10 seconds.
    delayMills := int64(10 * 1000)
    msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))
    // Send the message
    
    res, err := p.SendSync(context.Background(), msg)
    if err != nil {
    fmt.Printf("send message error: %s\\n", err)
    } else {
    fmt.Printf("send message success: result=%s\\n", res.String())
    }
    }
    
    // Release resources
    err = p.Shutdown()
    if err != nil {
    fmt.Printf("shutdown producer error: %s", err.Error())
    }
    3. Message sending is the same as above (taking the synchronous sending as an example).
    // Topic name
    var topicName = "topic1"
    // Construct message content
    msg := &primitive.Message{
    Topic: topicName, // Set the topic name
    Body: []byte("Hello RocketMQ Go Client! This is a new message."),
    }
    // Set the tag
    msg.WithTag("TAG")
    // Set the key
    msg.WithKeys([]string{"yourKey"})
    // Send the message
    res, err := p.SendSync(context.Background(), msg)
    
    if err != nil {
    fmt.Printf("send message error: %s\\n", err)
    } else {
    fmt.Printf("send message success: result=%s\\n", res.String())
    }
    Parameter
    Description
    topicName
    Topic name, which can be copied under the Topic tab on the Cluster page on the console.
    TAG
    Message tag identifier.
    yourKey
    Business message key.
    Release the resources.
    // Shut down the producer
    err = p.Shutdown()
    if err != nil {
    fmt.Printf("shutdown producer error: %s", err.Error())
    }
    Note:
    For more information on asynchronous sending and one-way sending, see the demo or RocketMQ-Client-Go Examples.
    4. Create a consumer.
    // Service access address (Note: http:// or https:// must be appended before the access address. Otherwise, it cannot be parsed.)
    var serverAddress = "http://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"
    // Authorize the role name
    var secretKey = "admin"
    // Authorize the key for the role
    var accessKey = "eyJrZXlJZC...."
    // Producer group name
    var groupName = "group11"
    // Create a consumer
    c, err := rocketmq.NewPushConsumer(
    // Set the consumer group
    consumer.WithGroupName(groupName),
    // Set the service address
    consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
    // Set ACL permissions
    consumer.WithCredentials(primitive.Credentials{
    SecretKey: secretKey,
    AccessKey: accessKey,
    }),
    // Set consumption from the start offset
    consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
    // Set the consumption mode (cluster mode by default)
    consumer.WithConsumerModel(consumer.Clustering),
    
    // For broadcasting consumption, set the instance name to the system name of the application. If it is not set, the PID will be used, which can cause duplicate consumption upon restart.
    consumer.WithInstance("xxxx"),
    )
    if err != nil {
    fmt.Println("init consumer2 error: " + err.Error())
    os.Exit(0)
    }
    5. Consume the message.
    // Topic name
    var topicName = "topic1"
    // Set the tag of messages that are subscribed to
    selector := consumer.MessageSelector{
    Type: consumer.TAG,
    Expression: "TagA || TagC",
    }
    // Define the delay level for retrying consumption. There are 18 delay levels in total. The following is the relationship between delay levels and delay time.
    // 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
    // 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
    delayLevel := 1
    err = c.Subscribe(topicName, selector, func(ctx context.Context,
    msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    fmt.Printf("subscribe callback len: %d \\n", len(msgs))
    // Set the delay level for the next consumption
    concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
    concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLater
    
    for _, msg := range msgs {
    // Simulate a successful consumption after three retries
    if msg.ReconsumeTimes > 3 {
    fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)
    return consumer.ConsumeSuccess, nil
    } else {
    fmt.Printf("subscribe callback: %v \\n", msg)
    }
    }
    // Simulate a consumption failure and respond with a retry
    return consumer.ConsumeRetryLater, nil
    })
    if err != nil {
    fmt.Println(err.Error())
    }
    Parameter
    Description
    topicName
    The name of the topic, copied from the Topic page on the console.
    Expression
    Message tag identifier.
    delayLevel
    Configure the delay level for re-consumption. A total of 18 delay levels are supported.
    6. Consume the message (The consumer must consume the message after subscription).
    // Initiate consumption
    err = c.Start()
    if err != nil {
    fmt.Println(err.Error())
    os.Exit(-1)
    }
    time.Sleep(time.Hour)
    // Release resources
    err = c.Shutdown()
    if err != nil {
    fmt.Printf("shundown Consumer error: %s", err.Error())
    }
    7. Check consumption details. After the message is sent, you will receive a message ID (messageID). Developers can query the recently sent messages on the Message Query page, as shown in the following figure. Information such as details and traces for specific messages is also available. For details, see Message Query section.
    
    
    Note:
    This document briefly describes sending and receiving messages using the Golang client. For more operations, see the demo or the RocketMQ-Client-Go Examples.
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support