tencent cloud

文档反馈

发送与接收顺序消息

最后更新时间:2023-09-13 11:38:17

    操作场景

    TDMQ RocketMQ 版支持用户通过内网或公网使用 HTTP 协议接入,并兼容社区的多语言 HTTP SDK
    本文以调用 Java SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
    注意:
    暂不支持通过使用 HTTP 协议实现事务消息。
    在创建 Group(消费组)时需要制定类型(TCP 或者 HTTP,详情请参见 创建 Group 说明 ),因此,同一个 Group(消费组)不支持 TCP 和 HTTP 客户端同时消费。

    前提条件

    通过 Maven 方式引入依赖,在 pom.xml 文件中添加对应语言的 SDK 依赖
    更多示例可以参见开源社区的 Demo 示例

    重试机制

    HTTP 采用固定重试间隔的机制,暂不支持自定义。
    消息类型
    重试间隔
    最大重试次数
    普通消息
    5分钟
    288
    顺序消息
    1分钟
    288
    说明:
    客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
    重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
    每次消费的消息句柄只在重试间隔内有效,过期无效。

    操作步骤

    步骤1:安装 Java 依赖库

    在 Java 项目中引入相关依赖,以 maven 工程为例,在 pom.xml 添加以下依赖:
    <!-- in your <dependencies> block -->
    <dependency>
    <groupId>com.aliyun.mq</groupId>
    <artifactId>mq-http-sdk</artifactId>
    <version>1.0.3</version>
    </dependency>

    步骤2:获取参数

    1. 登录 TDMQ 控制台,选择所在的集群,单击集群名进入集群详情页。
    2. 如下图所示,选择顶部的命名空间页签,单击右侧的配置权限进入权限配置页面,如当前角色列表为空,可以单击新建,新建一个角色,详细描述请参见 完成资源创建与准备
    
    
    
    3. 在页面上复制对应的 AK 和 SK,以备在接下来的步骤中使用。
    
    
    

    步骤3:生产消息

    创建消息生产者

    // 获取Client
    MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);
    
    // 获取Topic的Producer
    MQProducer producer = mqClient.getProducer(namespace, topicName);
    参数
    说明
    topicName
    主题名称,在控制台集群管理中 Topic 页签中复制。
    namespace
    命名空间名称,在控制台集群管理中 Namespace 页签中复制。
    
    
    
    endpoint
    集群 HTTP 协议接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。
    secretKey
    角色名称,在 角色管理 页面复制。
    accessKey
    角色密钥,在 角色管理 页面复制密钥列复制。
    
    
    

    发送顺序消息

    try {
    for (int i = 0; i < 10; i++) {
    TopicMessage pubMsg;
    pubMsg = new TopicMessage(
    ("Hello RocketMQ " + i).getBytes(),
    "TAG"
    );
    // 设置分区顺序消息的 ShardingKey
    pubMsg.setShardingKey(i % 3);
    TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
    System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());
    }
    } catch (Throwable e) {
    System.out.println("Send mq message failed.");
    e.printStackTrace();
    }
    参数
    说明
    TAG
    设置消息的 TAG。
    ShardingKey
    顺序消息的分区字段,相同 ShardingKey 的消息会发送到同一个分区。

    步骤4:消费消息

    创建消费者

    // 获取Client
    MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);
    
    // 获取Topic的Consumer
    MQProducer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
    参数
    说明
    topicName
    主题名称,在控制台集群管理中 Topic 页签中复制。
    groupName
    生产者组名称,在控制台集群管理中 Group 页签中复制。
    namespace
    命名空间名称,在控制台集群管理中 Namespace 页签中复制。
    
    
    
    TAG
    订阅的标签。
    endpoint
    集群 HTTP 协议接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。
    secretKey
    角色名称,在 角色管理 页面复制。
    accessKey
    角色密钥,在 角色管理 页面复制密钥列复制。
    
    
    

    订阅消息

    do {
    List<Message> messages = null;
    
    try {
    // 长轮询顺序消费消息, 拿到的消息可能是多个分区的, 一个分区的内的消息一定是顺序的
    // 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
    // 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
    messages = consumer.consumeMessageOrderly(
    Integer.parseInt(batchSize),
    Integer.parseInt(waitSeconds)
    );
    } catch (Throwable e) {
    e.printStackTrace();
    }
    if (messages == null || messages.isEmpty()) {
    System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
    continue;
    }
    
    for (Message message : messages) {
    System.out.println("Receive message: " + message);
    }
    
    {
    List<String> handles = new ArrayList<String>();
    for (Message message : messages) {
    handles.add(message.getReceiptHandle());
    }
    
    try {
    consumer.ackMessage(handles);
    } catch (Throwable e) {
    if (e instanceof AckMessageException) {
    AckMessageException errors = (AckMessageException) e;
    System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
    if (errors.getErrorMessages() != null) {
    for (String errorHandle :errors.getErrorMessages().keySet()) {
    System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
    + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
    }
    }
    continue;
    }
    e.printStackTrace();
    }
    }
    } while (true);
    参数
    说明
    batchSize
    一次拉取的消息条数,支持最多16条。
    waitSeconds
    一次拉取的轮询等待时间,支持最长30秒。
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持