tencent cloud

Feedback

Spring Cloud Stream

Last updated: 2023-09-12 17:53:17

    Overview

    This document describes how to use Spring Cloud Stream to send and receive messages and helps you better understand the message sending and receiving processes.

    Prerequisites

    You have created the required resources as instructed in Resource Creation and Preparation.
    You have downloaded the demo here or have downloaded one at the GitHub project.

    Directions

    Step 1. Import dependencies

    Import spring-cloud-starter-stream-rocketmq-related dependencies in pom.xml. It is recommended to use v2021.0.4.0.
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    <version>2021.0.4.0</version>
    </dependency>

    Step 2. Add configurations

    Add RocketMQ-related configurations to the configuration file.
    spring:
    cloud:
    stream:
    rocketmq:
    binder:
    # Full service address
    name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
    # Role name
    secret-key: admin
    # Role token
    access-key: eyJrZXlJZ...
    # Full namespace name
    namespace: rocketmq-xxx|namespace1
    # producer group
    group: producerGroup
    bindings:
    # Channel name, which is the same as the channel name in spring.cloud.stream.bindings.
    Topic-TAG1-Input:
    consumer:
    # Tag type of the subscription, which is configured based on consumers’ actual needs. All messages are subscribed to by default.
    subscription: TAG1
    # Channel name
    Topic-TAG2-Input:
    consumer:
    subscription: TAG2
    bindings:
    # Channel name
    Topic-send-Output:
    # Specify a topic, which refers to the one you created
    destination: TopicTest
    content-type: application/json
    # Channel name
    Topic-TAG1-Input:
    destination: TopicTest
    content-type: application/json
    group: consumer-group1
    # Channel name
    Topic-TAG2-Input:
    destination: TopicTest
    content-type: application/json
    group: consumer-group2
    Note
    1. Currently, only 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 or later versions support namespace configuration. If you use other versions, you need to concatenate topic and group names.
    The format is as follows:
    rocketmq-pngrpmk94d5o|stream%topic (format: namespace name %topic name)
    rocketmq-pngrpmk94d5o|stream%group (format: namespace name%group name)
    The format for Shared and Exclusive editions is as follows:
    MQ_INST_rocketmqpj79obd2ew7v_test%topic (format: namespace name%topic name)
    MQ_INST_rocketmqpj79obd2ew7v_test%group (format: namespace name%group name)
    2. The subscription configuration item is subscription for 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 and is tags for other earlier versions.
    The complete configuration items of other versions are as follows:
    spring:
    cloud:
    stream:
    rocketmq:
    bindings:
    # Channel name, which is the same as the channel name in spring.cloud.stream.bindings.
    Topic-test1:
    consumer:
    # Tag type of the subscription, which is configured based on consumers’ actual needs. All messages are subscribed to by default.
    tags: TAG1
    # Channel name
    Topic-test2:
    consumer:
    tags: TAG2
    binder:
    # Full service address
    name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
    # Role name
    secret-key: admin
    # Role token
    access-key: eyJrZXlJZ...
    bindings:
    # Channel name
    Topic-send:
    # Specify a topic in the format of `cluster ID|namespace name%topic name`, which refers to the one you created
    destination: rocketmq-xxx|stream%topic1
    content-type: application/json
    # Name of the group to be used in the format of `cluster ID|namespace name%group name`
    group: rocketmq-xxx|stream%group1
    # Channel name
    Topic-test1:
    destination: rocketmq-xxx|stream%topic1
    content-type: application/json
    group: rocketmq-xxx|stream%group1
    # Channel name
    Topic-test2:
    destination: rocketmq-xxx|stream%topic1
    content-type: application/json
    group: rocketmq-xxx|stream%group2
    Parameter
    Description
    name-server
    Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list.
    secret-key
    Role name, which can be copied on the Role Management page.
    access-key
    Role token, which can be copied in the Token column on the Role Management page.
    
    namespace
    Namespace name, which can be copied on the Namespace page in the console.
    group
    Producer group name, which can be copied under the Group tab on the cluster details page.
    destination
    Topic name, which can be copied on the Topic page in the console.

    Step 3. Configure channels

    You can separately configure input and output channels as needed.
    /**
    * Custom channel binder
    */
    public interface CustomChannelBinder {
    
    /**
    * (Message producers) send messages
    * Bind the channel name in the configurations
    */
    @Output("Topic-send-Output")
    MessageChannel sendChannel();
    
    
    /**
    * (Consumer 1) receives message 1
    * Bind the channel name in the configurations
    */
    @Input("Topic-TAG1-Input")
    MessageChannel testInputChannel1();
    
    /**
    * (Consumer 2) receives message 2
    * Bind the channel name in the configurations
    */
    @Input("Topic-TAG2-Input")
    MessageChannel testInputChannel2();
    }
    

    Step 4. Add annotations

    Add annotations to the configuration class or startup class. If multiple binders are configured, specify them in the annotations.
    @EnableBinding({CustomChannelBinder.class})

    Step 5. Send messages

    1. Inject CustomChannelBinder into the class that needs to send messages.
    @Autowired
    private CustomChannelBinder channelBinder;
    2. Use the corresponding output stream channel to send messages.
    Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
    channelBinder.sendChannel().send(message);

    Step 6. Consume messages

    @Service
    public class StreamConsumer {
    private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);
    
    /**
    * Listen on the channel configured in the configurations
    *
    * @param messageBody message content
    */
    @StreamListener("Topic-TAG1-Input")
    public void receive(String messageBody) {
    logger.info("Receive1: Messages are received through the stream. messageBody = {}", messageBody);
    }
    
    /**
    * Listen on the channel configured in the configurations
    *
    * @param messageBody message content
    */
    @StreamListener("Topic-TAG2-Input")
    public void receive2(String messageBody) {
    logger.info("Receive2: Messages are received through the stream. messageBody = {}", messageBody);
    }
    }

    Step 7: Perform local testing

    After starting the project locally, you can see from the console that the startup was successful.
    You can see that the sending is successful by checking http://localhost:8080/test-simple in the browser. Watch the output log of the development IDE.
    2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: send a message via stream, messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]
    2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: receive a message via stream, messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}
    
    You can see that a message of TAG1 is sent, and only the subscribers of TAG1 receive the message.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support