tencent cloud

Feedback

Sending and Receiving Delayed Messages

Last updated: 2023-05-16 11:07:52

    Overview

    This document describes how to use open-source SDK to send and receive timed messages by using the SDK for Java as an example.

    Prerequisites

    You have created the required resources as instructed in Resource Creation and Preparation.
    You have downloaded the demo here or have downloaded one at the GitHub project.

    Directions

    Step 1. Install the Java dependent library

    Introduce dependencies in a Java project and add the following dependencies to the pom.xml file. This document uses a Maven project as an example.
    Note
    The dependency version must be v4.9.3 or later, preferably v4.9.4.
    <!-- in your <dependencies> block -->
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
    </dependency>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.9.4</version>
    </dependency>

    Step 2. Produce messages

    Creating a message producer

    // Instantiate the message producer
    DefaultMQProducer producer = new DefaultMQProducer(
    groupName,
    new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL permission
    );
    // Set the Nameserver address
    producer.setNamesrvAddr(nameserver);
    // Start the producer instance
    producer.start();
    
    Parameter
    Description
    groupName
    Producer group name. It is recommended to use the corresponding topic name.
    nameserver
    Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list.
    secretKey
    Role name, which can be copied on the Role Management page.
    accessKey
    Role token, which can be copied in the Token column on the Role Management page.
    

    Sending a message

    Messages with fixed delay level

    int totalMessagesToSend = 5;
    for (int i = 0; i < totalMessagesToSend; i++) {
    Message message = new Message(TOPIC_NAME, ("Hello scheduled message " + i).getBytes());
    // Set message delay level
    message.setDelayTimeLevel(5);
    // Send the message
    SendResult sendResult = producer.send(message);
    System.out.println("sendResult = " + sendResult);
    }

    Messages with random delay time

    int totalMessagesToSend = 1;
    for (int i = 0; i < totalMessagesToSend; i++) {
    Message message = new Message(TOPIC_NAME, ("Hello timer message " + i).getBytes());
    // Set the time for sending the message
    long timeStamp = System.currentTimeMillis() + 30000;
    // To send a timed message, you need to specify a time for it, and the message will be delivered at the specified time. For example, if you set the time to be 2022-08-08 08:08:08, the message will be delivered at 2022-08-08 08:08:08.
    // If the timestamp is set before the current time, the message will be delivered to the consumer immediately.
    // Set `__STARTDELIVERTIME` into the property of `msg`
    message.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
    // Send the message
    SendResult sendResult = producer.send(message);
    System.out.println("sendResult = " + sendResult);
    }

    Step 3. Consume messages

    ####Creating a consumer

    TDMQ for RocketMQ supports two consumption modes: push and pull. Push mode is recommended.
    // Instantiate the consumer
    DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(
    groupName,
    new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL permission
    // Set the Nameserver address
    pushConsumer.setNamesrvAddr(nameserver);
    Parameter
    Description
    groupName
    Producer group name, which can be copied under the Group tab on the Cluster page in the console.
    nameserver
    Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list.
    secretKey
    Role name, which can be copied on the Role Management page.
    accessKey
    Role token, which can be copied in the Token column on the Role Management page.
    

    Subscribing to messages

    The subscription modes vary by consumption mode.
    // Subscribe to a topic
    pushConsumer.subscribe(topic_name, "*");
    // Register a callback implementation class to process messages pulled from the broker
    pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    // Message processing logic
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    // Mark the message as being successfully consumed and return the consumption status
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    // Start the consumer instance
    pushConsumer.start();
    Parameter
    Description
    topic_name
    Topic name, which can be copied under the Topic tab on the Cluster page in the console.
    "*"
    If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags.

    Step 4. View consumption details

    Log in to the TDMQ console, go to the Cluster > Group page, and view the list of clients connected to the group. Click View Details in the Operation column to view consumer details.
    
    
    Note
    Above is a brief introduction to message publishing and subscription. For more information, see Demo or RocketMQ documentation.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support