tencent cloud

Feedback

Sending and Receiving Sequential Messages

Last updated: 2023-09-13 11:37:45

    Overview

    TDMQ for RocketMQ can be accessed over the HTTP protocol from the private or public network. It is compatible with HTTP SDKs for multiple programming languages in the community.
    This document describes how to use HTTP SDK to send and receive messages by using the SDK for Java as an example and helps you better understand the message sending and receiving processes.
    Note
    Currently, transactional message cannot be implemented over HTTP.
    As a consumer group does not support simultaneous consumption by TCP and HTTP clients, you need to specify the type (TCP or HTTP) when creating a consumer group. For more information, see Group Management.

    Prerequisites

    You have created the required resources as instructed in Resource Creation and Preparation.
    You have imported dependencies through Maven and added SDK dependencies of the corresponding programming language in the pom.xml file.
    For more examples, see the demos in the open-source community.

    Retry Mechanism

    A fixed retry interval is used in HTTP, which can't be customized currently.
    Message Type
    Retry Interval
    Maximum Number of Retries
    General Message
    5 minutes
    288
    Sequential message
    1 minute
    288
    Note
    If the client acknowledges a message within the retry interval, the message consumption is successful and will not be retried.
    If the client doesn’t acknowledge a message after the retry interval has expired, the message will become visible again, and the client will consume it again.
    The message handle consumed each time is only valid within the retry interval, and become invalid after that time period.

    Directions

    Step 1. Install the Java dependent library

    Introduce dependencies in a Java project and add the following dependencies to the pom.xml file. This document uses a Maven project as an example.
    <!-- in your <dependencies> block -->
    <dependency>
    <groupId>com.aliyun.mq</groupId>
    <artifactId>mq-http-sdk</artifactId>
    <version>1.0.3</version>
    </dependency>

    Step 2. Get parameters

    1. Log in to the TDMQ console, select the target cluster, and click the cluster name to enter the cluster details page.
    2. Select the Namespace tab at the top and click Configure Permission on the right to enter the permission configuration page. If the role list is empty, click Create to create a role. For more information, see Resource Creation and Preparation.
    
    3. Copy the AK and SK on the page for use in next steps.
    

    Step 3. Produce messages

    Creating a message producer

    // Get the client
    MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);
    
    // Get the topic producer
    MQProducer producer = mqClient.getProducer(namespace, topicName);
    Parameter
    Description
    topicName
    Topic name, which can be copied under the Topic tab on the Cluster page in the console.
    namespace
    Namespace name, which can be copied under the Namespace tab on the Cluster page in the console.
    
    endpoint
    Cluster access address over HTTP, which can be obtained from Access Address in the Operation column on the Cluster page in the console.
    secretKey
    Role name, which can be copied on the Role Management page.
    accessKey
    Role token, which can be copied in the Token column on the Role Management page.
    

    Sending a message

    try {
    for (int i = 0; i < 10; i++) {
    TopicMessage pubMsg;
    pubMsg = new TopicMessage(
    ("Hello RocketMQ " + i).getBytes(),
    "TAG"
    );
    // Set the ShardingKey of the partitionally sequential message
    pubMsg.setShardingKey(i % 3);
    TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
    System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());
    }
    } catch (Throwable e) {
    System.out.println("Send mq message failed.");
    e.printStackTrace();
    }
    Parameter
    Description
    TAG
    Set the message tag.
    ShardingKey
    A partition field of sequential messages. Messages with the same ShardingKey will be sent to the same partition.

    Step 4. Consume messages

    Creating a consumer

    // Get the client
    MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);
    
    // Get the topic consumer
    MQProducer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
    Parameter
    Description
    topicName
    Topic name, which can be copied under the Topic tab on the Cluster page in the console.
    groupName
    Producer group name, which can be copied under the Group tab on the Cluster page in the console.
    namespace
    Namespace name, which can be copied under the Namespace tab on the Cluster page in the console.
    
    TAG
    Subscribed tag.
    endpoint
    Cluster access address over HTTP, which can be obtained from Access Address in the Operation column on the Cluster page in the console.
    secretKey
    Role name, which can be copied on the Role Management page.
    accessKey
    Role token, which can be copied in the Token column on the Role Management page.
    

    Subscribing to messages

    do {
    List<Message> messages = null;
    
    try {
    // Long polling consumes messages sequentially. Although the messages obtained may be from multiple partitions, the messages in a partition will definitely be sequential.
    // For sequential consumption, as long as a message in a partition hasn't been acknowledged to be consumed successfully, it will be consumed next time for this partition.
    // For a partition, the next batch of messages can only be consumed after all previous messages are acknowledged to be consumed successfully.
    messages = consumer.consumeMessageOrderly(
    Integer.parseInt(batchSize),
    Integer.parseInt(waitSeconds)
    );
    } catch (Throwable e) {
    e.printStackTrace();
    }
    if (messages == null || messages.isEmpty()) {
    System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
    continue;
    }
    
    for (Message message : messages) {
    System.out.println("Receive message: " + message);
    }
    
    {
    List<String> handles = new ArrayList<String>();
    for (Message message : messages) {
    handles.add(message.getReceiptHandle());
    }
    
    try {
    consumer.ackMessage(handles);
    } catch (Throwable e) {
    if (e instanceof AckMessageException) {
    AckMessageException errors = (AckMessageException) e;
    System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
    if (errors.getErrorMessages() != null) {
    for (String errorHandle :errors.getErrorMessages().keySet()) {
    System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
    + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
    }
    }
    continue;
    }
    e.printStackTrace();
    }
    }
    } while (true);
    Parameter
    Description
    batchSize
    The number of messages pulled at a time. Maximum value: 16.
    waitSeconds
    The polling waiting time for a message pull. Maximum value: 30 seconds.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support