tencent cloud

Feedback

Spring Cloud Stream

Last updated: 2022-09-21 18:25:04

    Overview

    This document describes how to use Spring Cloud Stream to send and receive messages and helps you better understand the message sending and receiving processes.

    Prerequisites

    Directions

    Step 1. Import dependencies

    Import dependencies related to stream-rocketmq to the pom.xml file.

    <dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-client</artifactid>
    <version>4.7.1</version>
    </dependency>
    <dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-acl</artifactid>
    <version>4.7.1</version>
    </dependency>

    <!-- The RocketMQ version in spring-cloud-starter-stream-rocketmq is old. Exclude it and import a new version.-->
    <dependency>
    <groupid>com.alibaba.cloud</groupid>
    <artifactid>spring-cloud-starter-stream-rocketmq</artifactid>
    <version>2.2.5-RocketMQ-RC1</version>
    <exclusions>
    <exclusion>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-client</artifactid>
    </exclusion>
    <exclusion>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-acl</artifactid>
    </exclusion>
    </exclusions>
    </dependency>

    Step 2. Add configurations

    Add RocketMQ-related configurations to the configuration file.

    spring:
    cloud:
    stream:
    rocketmq:
    bindings:
    # Channel name, which is the same as the channel name in spring.cloud.stream.bindings.
    Topic-test1:
    consumer:
    # Tag type of the subscription, which is configured based on consumers' actual needs. All messages are subscribed to by default.
    subscription: TAG1
    # Channel name
    Topic-test2:
    consumer:
    subscription: TAG2
    binder:
    # Full service address
    name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
    # Role name
    secret-key: admin
    # Role token
    access-key: eyJrZXlJZ...
    # Full namespace name
    namespace: rocketmq-xxx|namespace1
    # Producer group name
    group: group1
    bindings:
    # Channel name
    Topic-send:
    # Specify a topic, which refers to the one you created
    destination: topic1
    content-type: application/json
    # The group name to be used
    group: group1
    # Channel name
    Topic-test1:
    destination: topic1
    content-type: application/json
    group: group1
    # Channel name
    Topic-test2:
    destination: topic1
    content-type: application/json
    group: group2
    Note
    1. Currently, only 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 support namespace configuration. If you use other versions, you need to concatenate topic and group names.
    • The format is as follows:
      • rocketmq-pngrpmk94d5o|stream%topic namespace name%topic name
      • rocketmq-pngrpmk94d5o|stream%group namespace name%group name
    • The format for Shared and Exclusive editions is as follows:
      • MQ_INST_rocketmqpj79obd2ew7v_test%topic namespace name%topic name
      • MQ_INST_rocketmqpj79obd2ew7v_test%group namespace name%topic name
    1. The subscription configuration item is subscription for 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 and is tags for other versions.
    The complete configuration items of other versions are as follows:
    spring:
    cloud:
    stream:
    rocketmq:
    bindings:
    # Channel name, which is the same as the channel name in spring.cloud.stream.bindings.
    Topic-test1:
    consumer:
    # Tag type of the subscription, which is configured based on consumers' actual needs. All messages are subscribed to by default.
    tags: TAG1
    # Channel name
    Topic-test2:
    consumer:
    tags: TAG2
    binder:
    # Full service address
    name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
    # Role name
    secret-key: admin
    # Role token
    access-key: eyJrZXlJZ...
    bindings:
    # Channel name
    Topic-send:
    # Specify a topic in the format of `cluster ID|namespace name%topic name`, which refers to the one you created
    destination: rocketmq-xxx|stream%topic1
    content-type: application/json
    # Name of the group to be used in the format of `cluster ID|namespace name%group name`
    group: rocketmq-xxx|stream%group1
    # Channel name
    Topic-test1:
    destination: rocketmq-xxx|stream%topic1
    content-type: application/json
    group: rocketmq-xxx|stream%group1
    # Channel name
    Topic-test2:
    destination: rocketmq-xxx|stream%topic1
    content-type: application/json
    group: rocketmq-xxx|stream%group2
    Parameter Description
    name-server Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new shared or exclusive clusters can be copied from the Namespace list.
    secret-key Role name, which can be copied on the Role Management page.
    access-key Role token, which can be copied in the Token column on the Role Management page. img
    namespace Namespace name, which can be copied under the Namespace tab in the console.
    group Producer group name, which can be copied under the Group tab in the console.
    destination Topic name, which can be copied under the Topic tab in the console.
    subExpression A parameter used to set the message tag.

    Step 3. Configure channels

    You can separately configure input and output channels as needed.

    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;

    /**
    * Custom channel binder
    */

    public interface CustomChannelBinder {

    /**
    * (Message producers) send messages
    * Bind the channel name in the configurations
    */

    @Output("Topic-send")
    MessageChannel sendChannel()
    ;




    /**
    * (Consumer 1) receives message 1
    * Bind the channel name in the configurations
    */

    @Input("Topic-test1")
    MessageChannel testInputChannel1()
    ;

    /**
    * (Consumer 2) receives message 2
    * Bind the channel name in the configurations
    */

    @Input("Topic-test2")
    MessageChannel testInputChannel2()
    ;
    }

    Step 4. Add annotations

    Add annotations to the configuration class or startup class. If multiple binders are configured, specify them in the annotations.

    @EnableBinding({CustomChannelBinder.class})

    Step 5. Send messages

    1. Inject CustomChannelBinder into the class that needs to send messages.
      @Autowired
      private CustomChannelBinder channelBinder;
    2. Use the corresponding output stream channel to send messages.
      Message<string> message = MessageBuilder.withPayload("This is a new message.").build();
      channelBinder.sendChannel().send(message);

    Step 6. Consume messages

    @Service
    public class TestStreamConsumer {
    private final Logger logger = LoggerFactory.getLogger(DemoApplication.class);
    /**
    * Listen on the channel in the binder configurations
    *
    * @param messageBody message content
    */

    @StreamListener("Topic-test1")
    public void receive(String messageBody)
    {
    logger.info("Receive1: Messages are received through the stream. messageBody = {}", messageBody);
    }

    /**
    * Listen on the channel in the binder configurations
    *
    * @param messageBody message content
    */

    @StreamListener("Topic-test2")
    public void receive2(String messageBody)
    {
    logger.info("Receive2: Messages are received through the stream. messageBody = {}", messageBody);
    }
    }

    For more information, see the demo or Spring Cloud Stream documentation.

    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support