tencent cloud

Feedback

SDK for Go

Last updated: 2023-03-14 15:54:50

    Overview

    This document describes how to use open-source SDK to send and receive messages by using the SDK for Go as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    You have created the required resources.
    You have installed Go.
    You have downloaded the demo.

    Directions

    1. Run the following command in the client environment to RocketMQ client dependencies.
    go get github.com/apache/rocketmq-client-go/v2
    2. Create a producer.
    // Service access address. (Note: Add "http://" or "https://" before the access address; otherwise, it cannot be resolved.)
    var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
    // Authorized role name
    var secretKey = "admin"
    // Authorized role token
    var accessKey = "eyJrZXlJZC...."
    // Full namespace name
    var nameSpace = "rocketmq-xxx|namespace_go"
    // Producer group name
    var groupName = "group1"
    // Create a message producer
    p, _ := rocketmq.NewProducer(
    // Set the service address
    producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
    // Set ACL permissions
    producer.WithCredentials(primitive.Credentials{
    SecretKey: secretKey,
    AccessKey: accessKey,
    }),
    // Set the producer group
    producer.WithGroupName(groupName),
    // Set the namespace name
    producer.WithNamespace(nameSpace),
    // Set the number of retries upon sending failures
    producer.WithRetry(2),
    )
    // Start the producer
    err := p.Start()
    if err != nil {
    fmt.Printf("start producer error: %s", err.Error())
    os.Exit(1)
    }
    Parameter
    Description
    secretKey
    Role name, which can be copied on the Role Management page.
    accessKey
    Role token, which can be copied in the Token column on the Role Management page.
    
    nameSpace
    Full namespace name, which can be copied under the Topic tab on the Cluster page in the console and is in the format of cluster ID + | + namespace.
    
    serverAddress
    Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. (Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved.)
    
    groupName
    Producer group name, which can be copied under the Group tab in the console.
    3. Send messages (using sync sending as an example).
    // Topic name
    var topicName = "topic1"
    // Construct the message content
    msg := &primitive.Message{
    Topic: topicName, // Set the topic name
    Body: []byte("Hello RocketMQ Go Client! This is a new message."),
    }
    // Set the tag
    msg.WithTag("TAG")
    // Set the key
    msg.WithKeys([]string{"yourKey"})
    // Send the message
    res, err := p.SendSync(context.Background(), msg)
    if err != nil {
    fmt.Printf("send message error: %s\\n", err)
    } else {
    fmt.Printf("send message success: result=%s\\n", res.String())
    }
    Parameter
    Description
    topicName
    Topic name, which can be copied under the Topic tab on the Cluster page in the console.
    
    TAG
    Message tag.
    yourKey
    Message key.
    Release the resources.
    // Close the producer
    err = p.Shutdown()
    if err != nil {
    fmt.Printf("shutdown producer error: %s", err.Error())
    }
    Notes
    For more information on async sending and one-way sending, see the demo or RocketMQ-Client-Go Examples.
    4. Create a consumer.
    // Service access address. (Note: Add "http://" or "https://" before the access address; otherwise, it cannot be resolved.)
    var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
    // Authorized role name
    var secretKey = "admin"
    // Authorized role token
    var accessKey = "eyJrZXlJZC...."
    // Full namespace name
    var nameSpace = "rocketmq-xxx|namespace_go"
    // Producer group name
    var groupName = "group11"
    // Create a consumer
    c, err := rocketmq.NewPushConsumer(
    // Set the consumer group
    consumer.WithGroupName(groupName),
    // Set the service address
    consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
    // Set ACL permissions
    consumer.WithCredentials(primitive.Credentials{
    SecretKey: secretKey,
    AccessKey: accessKey,
    }),
    // Set the namespace name
    consumer.WithNamespace(nameSpace),
    // Set consumption from the start offset
    consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
    // Set the consumption mode (cluster consumption by default)
    consumer.WithConsumerModel(consumer.Clustering),
    )
    if err != nil {
    fmt.Println("init consumer2 error: " + err.Error())
    os.Exit(0)
    }
    Parameter
    Description
    secretKey
    Role name, which can be copied on the Role Management page.
    accessKey
    Role token, which can be copied in the Token column on the Role Management page.
    
    nameSpace
    Full namespace name, which can be copied under the Topic tab on the Cluster page in the console and is in the format of cluster ID + | + namespace.
    
    serverAddress
    Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. (Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved.)
    
    groupName
    Producer group name, which can be copied under the Group tab in the console.
    5. Consume messages.
    // Topic name
    var topicName = "topic1"
    // Set the tag of messages that are subscribed to
    selector := consumer.MessageSelector{
    Type: consumer.TAG,
    Expression: "TagA || TagC",
    }
    // Set the delay level of consumption retry. A total of 18 levels can be set. Below is the relationship between each delay level and the delay time.
    // 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
    // 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
    delayLevel := 1
    err = c.Subscribe(topicName, selector, func(ctx context.Context,
    msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    fmt.Printf("subscribe callback len: %d \\n", len(msgs))
    // Set the delay level for the next consumption
    concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
    concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLater
    for _, msg := range msgs {
    // Simulate a successful consumption after three retries
    if msg.ReconsumeTimes > 3 {
    fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)
    return consumer.ConsumeSuccess, nil
    } else {
    fmt.Printf("subscribe callback: %v \\n", msg)
    }
    }
    // Simulate a consumption failure. Retry is required.
    return consumer.ConsumeRetryLater, nil
    })
    if err != nil {
    fmt.Println(err.Error())
    }
    Parameter
    Description
    topicName
    Topic name, which can be copied on the Topic page in the console.
    
    
    
    Expression
    Message tag.
    delayLevel
    A parameter used to set the delay level of consumption retry. A total of 18 delay levels are supported.
    6. Consume messages (the consumer can consume messages only after the messages are subscribed to).
    // Start consumption
    err = c.Start()
    if err != nil {
    fmt.Println(err.Error())
    os.Exit(-1)
    }
    time.Sleep(time.Hour)
    // Release the resources
    err = c.Shutdown()
    if err != nil {
    fmt.Printf("shundown Consumer error: %s", err.Error())
    }
    7. View consumption details. Log in to the TDMQ console, go to the Cluster > Group page, and view the list of clients connected to the group. Click View Details in the Operation column to view consumer details.
    
    Notes
    Above is a brief introduction to how to send and receive messages with the Go client. For more information, see the demo or Rocketmq-Client-Go Examples.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support