tencent cloud

文档反馈

最后更新时间:2023-11-24 14:33:34

    操作场景

    本文以调用 Go SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

    前提条件

    操作步骤

    1. 在客户端环境执行如下命令下载 RocketMQ 客户端相关的依赖包。
    go get github.com/apache/rocketmq-client-go/v2
    2. 在对应的方法内创建生产者,如您需要发送普通消息,则在 syncSendMessage.go 文件内修改对应的参数。
    延时消息目前支持任意精度的延时,且不受 delay level 的影响。
    普通消息
    延时消息
    // 服务接入地址 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)
    var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
    // 授权角色名
    var secretKey = "admin"
    // 授权角色密钥
    var accessKey = "eyJrZXlJZC...."
    // 命名空间全称
    var nameSpace = "MQ_INST_rocketmqem4xxxx"
    // 生产者组名称
    var groupName = "group1"
    // 创建消息生产者
    p, _ := rocketmq.NewProducer(
    // 设置服务地址
    producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
    // 设置acl权限
    producer.WithCredentials(primitive.Credentials{
    SecretKey: secretKey,
    AccessKey: accessKey,
    }),
    // 设置生产组
    producer.WithGroupName(groupName),
    // 设置命名空间名称
    producer.WithNamespace(nameSpace),
    // 设置发送失败重试次数
    producer.WithRetry(2),
    )
    // 启动producer
    err := p.Start()
    if err != nil {
    fmt.Printf("start producer error: %s", err.Error())
    os.Exit(1)
    }
    
    // topic名称
    var topicName = "topic1"
    // 生产者组名称
    var groupName = "group1"
    // 创建消息生产者
    p, _ := rocketmq.NewProducer(
    // 设置服务地址
    producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"})),
    // 设置acl权限
    producer.WithCredentials(primitive.Credentials{
    SecretKey: "admin",
    AccessKey: "eyJrZXlJZC......",
    }),
    // 设置生产组
    producer.WithGroupName(groupName),
    // 设置命名空间名称
    producer.WithNamespace("rocketmq-xxx|namespace_go"),
    // 设置发送失败重试次数
    producer.WithRetry(2),
    )
    // 启动producer
    err := p.Start()
    if err != nil {
    fmt.Printf("start producer error: %s", err.Error())
    os.Exit(1)
    }
    for i := 0; i < 1; i++ {
    msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))
    // 设置延迟等级
    // 等级与时间对应关系:
    // 1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
    // 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
    //如果想用延迟级别,那么设置下面这个方法
    
    msg.WithDelayTimeLevel(3)
    //如果想用任意延迟消息,那么设置下面这个方法,WithDelayTimeLevel 就不要设置了,单位为具体的毫秒,如下则是10s后投递
    delayMills := int64(10 * 1000)
    msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))
    // 发送消息
    res, err := p.SendSync(context.Background(), msg)
    if err != nil {
    fmt.Printf("send message error: %s\\n", err)
    } else {
    fmt.Printf("send message success: result=%s\\n", res.String())
    }
    }
    
    // 释放资源
    err = p.Shutdown()
    if err != nil {
    fmt.Printf("shutdown producer error: %s", err.Error())
    }
    参数
    说明
    secretKey
    角色名称,在 角色管理 页面复制。
    accessKey
    角色密钥,在 角色管理 页面复制密钥列复制。
    
    
    
    nameSpace
    命名空间的名称,在控制台命名空间页面复制。
    serverAddress
    集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)。
    
    
    
    groupName
    生产者组名称,在控制台 Group 页面复制。
    3. 发送消息同上(以同步发送方式为例)。
    // topic名称
    var topicName = "topic1"
    // 构造消息内容
    msg := &primitive.Message{
    Topic: topicName, // 设置topic名称
    Body: []byte("Hello RocketMQ Go Client! This is a new message."),
    }
    // 设置tag
    msg.WithTag("TAG")
    // 设置key
    msg.WithKeys([]string{"yourKey"})
    // 发送消息
    res, err := p.SendSync(context.Background(), msg)
    if err != nil {
    fmt.Printf("send message error: %s\\n", err)
    } else {
    fmt.Printf("send message success: result=%s\\n", res.String())
    }
    参数
    说明
    topicName
    Topic 名称在控制台集群管理中 Topic 页签中复制具体 Topic 名称。
    TAG
    消息 TAG 标识。
    yourKey
    消息业务 key。
    资源释放。
    // 关闭生产者
    err = p.Shutdown()
    if err != nil {
    fmt.Printf("shutdown producer error: %s", err.Error())
    }
    说明
    异步发送、单向发送等,可参见 demo 示例 或参见 rocketmq-client-go 示例
    4. 创建消费者。
    // 服务接入地址 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)
    var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
    // 授权角色名
    var secretKey = "admin"
    // 授权角色密钥
    var accessKey = "eyJrZXlJZC...."
    // 命名空间全称
    var nameSpace = "rocketmq-xxx|namespace_go"
    // 生产者组名称
    var groupName = "group11"
    // 创建consumer
    c, err := rocketmq.NewPushConsumer(
    // 设置消费者组
    consumer.WithGroupName(groupName),
    // 设置服务地址
    consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
    // 设置acl权限
    consumer.WithCredentials(primitive.Credentials{
    SecretKey: secretKey,
    AccessKey: accessKey,
    }),
    // 设置命名空间名称
    consumer.WithNamespace(nameSpace),
    // 设置从起始位置开始消费
    consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
    // 设置消费模式(默认集群模式)
    consumer.WithConsumerModel(consumer.Clustering),
    //广播消费,设置一下实例名,设置为应用的系统名即可。如果不设置,会使用pid,这会导致重启消费重复
    consumer.WithInstance("xxxx"),
    )
    if err != nil {
    fmt.Println("init consumer2 error: " + err.Error())
    os.Exit(0)
    }
    参数
    说明
    secretKey
    角色名称,在 角色管理 页面复制。
    accessKey
    角色密钥,在 角色管理 页面复制密钥列复制。
    
    
    
    nameSpace
    命名空间全称在控制台集群管理中 Topic 页签中页面复制,格式是集群 ID +|+命名空间。
    serverAddress
    集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)
    
    
    
    groupName
    生产者组名称,在控制台 Group 页面复制。
    5. 消费消息。
    // topic名称
    var topicName = "topic1"
    // 设置订阅消息的tag
    selector := consumer.MessageSelector{
    Type: consumer.TAG,
    Expression: "TagA || TagC",
    }
    // 设置重新消费的延迟级别,共支持18种延迟级别。下面是延迟级别与延迟时间的对应关系
    // 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
    // 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
    delayLevel := 1
    err = c.Subscribe(topicName, selector, func(ctx context.Context,
    msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    fmt.Printf("subscribe callback len: %d \\n", len(msgs))
    // 设置下次消费的延迟级别
    concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
    concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLater
    for _, msg := range msgs {
    // 模拟重试3次后消费成功
    if msg.ReconsumeTimes > 3 {
    fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)
    return consumer.ConsumeSuccess, nil
    } else {
    fmt.Printf("subscribe callback: %v \\n", msg)
    }
    }
    // 模拟消费失败,回复重试
    return consumer.ConsumeRetryLater, nil
    })
    if err != nil {
    fmt.Println(err.Error())
    }
    参数
    说明
    topicName
    Topic 的名称,在控制台 Topic 页面复制。
    Expression
    消息 TAG 标识。
    delayLevel
    设置重新消费的延迟级别,共支持18种延迟级别。
    6. 消费消息 (消费者消费消息必须在订阅之后)。
    // 开始消费
    err = c.Start()
    if err != nil {
    fmt.Println(err.Error())
    os.Exit(-1)
    }
    time.Sleep(time.Hour)
    // 资源释放
    err = c.Shutdown()
    if err != nil {
    fmt.Printf("shundown Consumer error: %s", err.Error())
    }
    7. 查看消费详情。登录 TDMQ 控制台,在集群管理 > Group 页面,可查看与 Group 连接的客户端列表,单击操作列的查看消费者详情,可查看消费者详情。
    
    
    
    
    说明
    本文简单介绍了使用 Go 客户端进行简单的收发消息,更多操作可参见 Demorocketmq-client-go 示例
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持