tencent cloud

Feedback

SDK for Java

Last updated: 2022-09-27 15:17:14

    Overview

    This document describes how to use open-source SDK to send and receive messages using the SDK for Java as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    Directions

    Step 1. Install the Java dependency library

    Introduce dependencies in a Java project and add the following dependencies to the pom.xml file. This document uses a Maven project as an example.

    Note:

    The dependency version must be v4.6.1 or later.

    <!-- in your <dependencies> block -->
    <dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-client</artifactid>
    <version>4.6.1</version>
    </dependency>
    <dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-acl</artifactid>
    <version>4.6.1</version>
    </dependency>

    Step 2. Produce messages

    1. Create message producers.
      // Instantiate the message producer
      DefaultMQProducer producer = new DefaultMQProducer(
      namespace,
      groupName,
      new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL permission
      )
      ;
      // Set the NameServer address
      producer.setNamesrvAddr(nameserver);
      // Start the producer instances
      producer.start();
      Parameter Description
      namespace Namespace name, which can be copied under the Namespace tab in the console. Its format is cluster ID + | + namespace.
      groupName Producer group name, which can be copied under the Group tab on the Clusterpage in the console.
      nameserver Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new shared or exclusive clusters can be copied from the Namespace list.
      secretKey Role name, which can be copied on the Role Management page.
      accessKey Role token, which can be copied in the Token column on the Role Management page. img
    2. Send messages, which can be sent in the sync, async, or one-way mode.
      • Sync sending
        for (int i = 0; i < 10; i++) {
        // Create a message instance and set the topic and message content
        Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        // Send the message
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        }
        Parameter Description
        topic_name Topic name, which can be copied under the Topic tab on the Cluster page in the console.
        TAG A parameter used to set the message tag.
      • Async sending
        // Disable retry upon sending failures
        producer.setRetryTimesWhenSendAsyncFailed(0);
        // Set the number of messages to be sent
        int messageCount = 10;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++)
        {
        try {
        final int index = i;
        // Create a message instance and set the topic and message content
        Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
        // Logic for message sending successes
        countDownLatch.countDown();
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
        }

        @Override
        public void onException(Throwable e) {
        // Logic for message sending failures
        countDownLatch.countDown();
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
        }
        });
        } catch (Exception e) {
        e.printStackTrace();
        }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        Parameter Description
        topic_name Topic name, which can be copied under the Topic tab on the Cluster page in the console.
        TAG A parameter used to set the message tag.
      • One-way sending
        for (int i = 0; i < 10; i++) {
        // Create a message instance and set the topic and message content
        Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        // Send one-way messages
        producer.sendOneway(msg);
        }
        Parameter Description
        topic_name Topic name, which can be copied under the Topic tab on the Cluster page in the console.
        TAG A parameter used to set the message tag.
    Note:

    For more information on batch sending or other scenarios, see the demo or RocketMQ documentation.

    Step 3. Consume messages

    1. Create a consumer. TDMQ for RocketMQ supports two consumption modes: push and pull.
      • For consumers using the push mode:
        // Instantiate the consumer
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(
        namespace,
        groupName,
        new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)))
        ; // ACL permission
        // Set the NameServer address
        pushConsumer.setNamesrvAddr(nameserver);
        Parameter Description
        namespace Namespace name, which can be copied on the Namespace tab in the console. Its format is cluster ID + | + namespace.
        groupName Producer group name, which can be copied under the Group tab on the Cluster page in the console.
        nameserver Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new shared or exclusive clusters can be copied from the Namespace list.
        secretKey Role name, which can be copied on the Role Management page.
        accessKey Role token, which can be copied in the Token column on the Role Management page. img
      • For consumers using the pull mode:
        // Instantiate the consumer
        DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(
        namespace,
        groupName,
        new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)))
        ;
        // Set the NameServer address
        pullConsumer.setNamesrvAddr(nameserver);
        // Specify the first offset as the start offset for consumption
        pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        Parameter Description
        namespace Namespace name, which can be copied on the Namespace tab in the console. Its format is cluster ID + | + namespace.
        groupName Producer group name, which can be copied under the Group tab on the Cluster page in the console. Namespace access addresses in new shared or exclusive clusters can be copied from the Namespace list.
        nameserver Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new shared or exclusive clusters can be copied from the Namespace list.
        secretKey Role name, which can be copied on the Role Management page.
        accessKey Role token, which can be copied in the Token column on the Role Management page. img
    Note:

    For more consumption mode information, see the demo or RocketMQ documentation.

    1. Subscribed to messages. The subscription modes vary by consumption mode.
      • Subscription in push mode
        // Subscribe to a topic
        pushConsumer.subscribe(topic_name, "*");
        // Register a callback implementation class to process messages pulled from the broker
        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        // Message processing logic
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // Mark the message as being successfully consumed and return the consumption status
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // Start the consumer instance
        pushConsumer.start();
        Parameter Description
        topic_name Topic name, which can be copied under the Topic tab on the Cluster page in the console.
        "*" If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags.
      • Subscription in pull mode
        // Subscribe to a topic
        pullConsumer.subscribe(topic_name, "*");
        // Start the consumer instance
        pullConsumer.start();
        try {
        System.out.printf("Consumer Started.%n");
        while (true)
        {
        // Pull the message
        List<messageext> messageExts = pullConsumer.poll();
        System.out.printf("%s%n", messageExts);
        }
        } finally {
        pullConsumer.shutdown();
        Parameter Description
        topic_name Topic name, which can be copied under the Topic tab on the Cluster page in the console.
        "*" If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags.

    Step 4. View consumption details

    Log in to the TDMQ console, go to the Cluster > Group page, and view the list of clients connected to the group. Click View Details in the Operation column to view consumer details.
    img

    Note:

    Above is a brief introduction to message publishing and subscription. For more information, see the demo or RocketMQ documentation

    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support