tencent cloud

Feedback

Access over HTTP

Last updated: 2023-05-16 11:07:52

    Overview

    TDMQ for RocketMQ can be accessed over the HTTP protocol from the private or public network. It is compatible with HTTP SDKs for multiple programming languages in the community.
    This document describes how to use HTTP SDK to send and receive messages by using the SDK for Java as an example and helps you better understand the message sending and receiving processes.
    Note
    Currently, transactional message and sequential message cannot be implemented over HTTP.
    When creating a consumer group, you need to specify the type (TCP or HTTP, as described in Group Management); therefore, a consumer group does not support simultaneous consumption by TCP and HTTP clients.

    Prerequisites

    You have created the required resources as instructed in Resource Creation and Preparation.
    You have imported dependencies through Maven and added SDK dependencies of the corresponding programming language in the pom.xml file.
    For more examples, see the demos in the open-source community.

    Retry Mechanism

    Every message consumed over HTTP will have an invisibility time of 5 minutes.
    If the client acknowledges a message within the invisibility time, the consumption is successful and will not be retried.
    If the client does not acknowledge a message after the invisibility time elapses, the message will become visible again, that is, the client will consume the message again subsequently.
    Note that after the invisibility time of a message elapses during one consumption, the message handler will become invalid, and the message can no longer be acknowledged.

    Directions

    Step 1. Import dependencies

    Import the SDK dependencies of the corresponding programming language into the pom.xml file of the project.

    Step 2. Get parameters

    1. Log in to the TDMQ console, select the target cluster, and click the cluster name to enter the cluster details page.
    2. Select the Namespace tab at the top and click Configure Permission on the right to enter the permission configuration page. If the role list is empty, click Create to create a role. For more information, see Resource Creation and Preparation.
    
    3. Copy the AK and SK on the page for use in next steps.
    

    Step 3. Initialize the producer client

    JAVA
    PHP
    NodeJS
    import com.aliyun.mq.http.MQClient;
    import com.aliyun.mq.http.MQProducer;
    
    public class Producer {
    
    public static void main(String[] args) {
    MQClient mqClient = new MQClient(
    // HTTP access point
    "${HTTP_ENDPOINT}",
    // Access key, which can be created and obtained in the TDMQ for RocketMQ console.
    "${ACCESS_KEY}",
    // Role name, which can be created and obtained in the TDMQ for RocketMQ console.
    "${SECRET_KEY}"
    );
    
    // The topic used for sending messages, which is required and can be obtained in the TDMQ for RocketMQ console.
    final String topic = "${TOPIC}";
    // The namespace of the topic, which is required and can be obtained in the TDMQ console.
    final String instanceId = "${INSTANCE_ID}";
    
    // Create a producer
    MQProducer producer = mqClient.getProducer(instanceId, topic);
    
    // Send the message
    
    mqClient.close();
    }
    }
    require "vendor/autoload.php";
    
    use MQ\\MQClient;
    
    class ProducerTest
    {
    private $client;
    private $producer;
    
    public function __construct()
    {
    $this->client = new MQClient(
    // HTTP access point
    "${HTTP_ENDPOINT}",
    // Access key, which can be created and obtained in the TDMQ for RocketMQ console.
    "${ACCESS_KEY}",
    // Role name, which can be created and obtained in the TDMQ for RocketMQ console.
    "${SECRET_KEY}"
    );
    
    // The topic used for sending messages, which is required and can be obtained in the TDMQ for RocketMQ console.
    $topic = "${TOPIC}";
    // The namespace of the topic, which is required and can be obtained in the TDMQ console.
    $instanceId = "${INSTANCE_ID}";
    
    $this->producer = $this->client->getProducer($instanceId, $topic);
    }
    
    public function run()
    {
    // Send the message
    }
    }
    
    
    $instance = new ProducerTest();
    $instance->run();
    const {
    MQClient,
    MessageProperties
    } = require('@aliyunmq/mq-http-sdk');
    
    // Set HTTP access endpoints
    const endpoint = "{Endpoint}";
    // AccessKey
    const accessKeyId = "{Accesskey}";
    // SecretKey
    const accessKeySecret = "rop";
    
    var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
    
    // Its Topic
    const topic = "TopicA";
    // ID of the instance to which the topic belongs
    const instanceId = "MQ_INST_xxxxx";
    
    const producer = client.getProducer(instanceId, topic);
    
    (async function(){
    try {
    // Send 4 messages in a loop
    for(var i = 0; i < 4; i++) {
    let res;
    if (i % 2 == 0) {
    msgProps = new MessageProperties();
    // Set attributes
    msgProps.putProperty("key", i);
    // Set keys
    msgProps.messageKey("MessageKey");
    res = await producer.publishMessage("hello mq.", "", msgProps);
    } else {
    msgProps = new MessageProperties();
    // Set attributes
    msgProps.putProperty("key", i);
    // Timed message, with the time being 10s later
    msgProps.startDeliverTime(Date.now() + 10 * 1000);
    res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
    }
    console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
    }
    
    } catch(e) {
    // The message failed to be sent and needs to be retried. You can resend this message or persist this data entry.
    console.log(e)
    }
    

    Step 4. Initialize the consumer client

    JAVA
    PHP
    NodeJS
    import com.aliyun.mq.http.MQClient;
    import com.aliyun.mq.http.MQConsumer;
    
    public class Consumer {
    
    public static void main(String[] args) {
    MQClient mqClient = new MQClient(
    // HTTP access point
    "${HTTP_ENDPOINT}",
    // Access key, which can be created and obtained in the TDMQ for RocketMQ console.
    "${ACCESS_KEY}",
    // Role name, which can be created and obtained in the TDMQ for RocketMQ console.
    "${SECRET_KEY}"
    );
    
    // The topic used for consuming messages, which is required and can be obtained in the TDMQ console.
    final String topic = "${TOPIC}";
    // Consumer group name, which is required and can be obtained in the TDMQ console.
    final String groupId = "${GROUP_ID}";
    // The namespace of the topic, which is required and can be obtained in the TDMQ console.
    final String instanceId = "${INSTANCE_ID}";
    
    final MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
    
    do {
    // Consume a message
    } while (true);
    }
    }
    require "vendor/autoload.php";
    
    use MQ\\MQClient;
    
    class ConsumerTest
    {
    private $client;
    private $consumer;
    
    public function __construct()
    {
    $this->client = new MQClient(
    // HTTP access point
    "${HTTP_ENDPOINT}",
    // Access key, which can be created and obtained in the TDMQ for RocketMQ console.
    "${ACCESS_KEY}",
    // Role name, which can be created and obtained in the TDMQ for RocketMQ console.
    "${SECRET_KEY}"
    );
    
    // The topic used for consuming messages, which is required and can be obtained in the TDMQ console.
    $topic = "${TOPIC}";
    // Consumer group name, which is required and can be obtained in the TDMQ console.
    $groupId = "${GROUP_ID}";
    // The namespace of the topic, which is required and can be obtained in the TDMQ console.
    $instanceId = "${INSTANCE_ID}";
    
    $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }
    
    public function run()
    {
    while (True) {
    // Consume a message
    }
    }
    }
    
    
    $instance = new ConsumerTest();
    $instance->run();
    const {
    MQClient
    } = require('@aliyunmq/mq-http-sdk');
    
    // Set HTTP access endpoints
    const endpoint = "{Endpoint}";
    // AccessKey
    const accessKeyId = "{Accesskey}";
    // SecretKey
    const accessKeySecret = "rop";
    
    var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
    
    // Its Topic
    const topic = "TopicA";
    // ID of the instance to which the topic belongs
    const instanceId = "MQ_INST_xxxxx";
    // The consumer group you created in the console
    const groupId = "GID_xxx";
    
    const consumer = client.getConsumer(instanceId, topic, groupId);
    
    (async function(){
    // Consume messages in loop
    while(true) {
    try {
    // long polling of consumption messages
    // Long polling means that if the topic has no messages, the request will hang on the server for 3 seconds. If there is a message that can be consumed within 3 seconds, it will return immediately.
    res = await consumer.consumeMessage(
    3, // This indicates a maximum of 3 messages can be consumed at a time. Up to 16 messages can be set.
    3 // Long polling lasts 3 seconds, which can be set up to 30 seconds.
    );
    
    if (res.code == 200) {
    // Consume messages based on business processing logic
    console.log("Consume Messages, requestId:%s", res.requestId);
    const handles = res.body.map((message) => {
    console.log("\\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
    ",Props:%j,MessageKey:%s,Prop-A:%s",
    message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
    message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
    return message.ReceiptHandle;
    });
    
    // If a message is not acked for successful consumption before `message.NextConsumeTime`, it will be consumed repeatedly.
    // The message handle has a timestamp that changes each time the same message is consumed.
    res = await consumer.ackMessage(handles);
    if (res.code != 204) {
    // The handle of some messages may time out, which will cause the acknowledgement to fail.
    console.log("Ack Message Fail:");
    const failHandles = res.body.map((error)=>{
    console.log("\\tErrorHandle:%s, Code:%s, Reason:%s\\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
    return error.ReceiptHandle;
    });
    handles.forEach((handle)=>{
    if (failHandles.indexOf(handle) < 0) {
    console.log("\\tSucHandle:%s\\n", handle);
    }
    });
    } else {
    // The message is acked for successful consumption
    console.log("Ack Message suc, RequestId:%s\\n\\t", res.requestId, handles.join(','));
    }
    }
    } catch(e) {
    if (e.Code.indexOf("MessageNotExist") > -1) {
    // If there is no message, long polling will continue on the server.
    console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
    } else {
    console.log(e);
    }
    }
    }
    })();

    

    
    
    
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support