tencent cloud

Feedback

Spring Cloud Stream

Last updated: 2022-07-22 16:43:21

    Overview

    This document describes how to use open-source SDK to send and receive messages by using the SDK for Spring Cloud Stream as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    Directions

    Step 1. Add dependencies

    Add Stream RabbitMQ dependencies to pom.xml.

    <dependency>
    <groupid>org.springframework.cloud</groupid>
    <artifactid>spring-cloud-starter-stream-rabbit</artifactid>
    </dependency>

    Step 2. Prepare configurations

    1. Configure the configuration file (with the configuration of a direct exchange as an example).
      spring:
      application:
      name: application-name
      cloud:
      stream:
      rabbit:
      bindings:
      # Output channel name
      output:
      # Producer configuration information
      producer:
      # Type of the exchange used by the producer. If the exchange name already exists, this type must be the same as that of the exchange type
      exchangeType: direct
      # It is used to specify a routing key expression
      routing-key-expression: headers["routeTo"] # This field indicates that the `routeTo` field in the header is used as the routing key
      queueNameGroupOnly: true
      # Input channel name
      input:
      # Consumer configuration information
      consumer:
      # Type of the exchange used by the consumer. If the exchange name already exists, this type must be the same as that of the exchange type
      exchangeType: direct
      # Routing keys bound to the consumer message queue
      bindingRoutingKey: info,waring,error
      # If the configuration is modified, the above routing keys will be processed
      bindingRoutingKeyDelimiter: "," # This configuration item indicates that commas are used to separate the configured routing keys
      # Message acknowledgment mode. For more information, see `AcknowledgeMode`
      acknowledge-mode: manual
      queueNameGroupOnly: true
      bindings:
      # Output channel name
      output: # Channel name
      destination: direct_logs # Name of the exchange to be used
      content-type: application/json
      default-binder: dev-rabbit
      # Input channel name
      input: # Channel name
      destination: direct_logs # Name of the exchange to be used
      content-type: application/json
      default-binder: dev-rabbit
      group: route_queue1 # Name of the message queue to be used
      binders:
      dev-rabbit:
      type: rabbit
      environment:
      spring:
      rabbitmq:
      host: amqp-xxx.rabbitmq.xxx.tencenttdmq.com #Cluster access address, which can be obtained by clicking “Access Address” in the “Operation” column on the cluster management page.
      port: 5672
      username: admin # Role name
      password: password # Role token
      virtual-host: vhostnanme # Vhost name
      Parameter Description
      bindingRoutingKey Routing key bound to the consumer message queue, which is the routing rule of a message and can be obtained in the Binding Key column under the “Routing” tab on the cluster details page in the console. img
      direct_log Exchange name, which can be obtained from the exchange list in the console.
      route_queue1 Queue name, which can be obtained from the queue list in the console.
      String Cluster access address, which can be obtained in the console by clicking Access Address in the “Operation” column on the Cluster page. img
      port Cluster access port, which can be obtained by clicking Access Address in the "Operation" column of the cluster list on the Cluster page.
      name-server Role name, which can be copied on the Role Management page.
      password Role token, which can be copied in the Token column on the Role Management page.
      virtual-host Vhost name in the format of “cluster ID + | + vhost name”. img
    2. Create a configuration file loading program.
      • OutputMessageBinding.java
        public interface OutputMessageBinding {
        /**
        * Name of the channel to be used (output channel name)
        */

        String OUTPUT = "output";
        @Output(OUTPUT)
        MessageChannel output()
        ;
        }
      • InputMessageBinding.java
        public interface InputMessageBinding {
        /**
        * Name of the channel to be used
        */

        String INPUT = "input";
        @Input(INPUT)
        SubscribableChannel input()
        ;
        }

    Step 3. Send messages

    Create and compile the message sending program IMessageSendProvider.java.

    // Import the configuration class
    @EnableBinding(OutputMessageBinding.class)
    public class MessageSendProvider {

    @Autowired
    private OutputMessageBinding outputMessageBinding;

    public String sendToDirect()
    {
    outputMessageBinding.output().send(MessageBuilder.withPayload("[info] This is a new message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "info").build());
    outputMessageBinding.output().send(MessageBuilder.withPayload("[waring] This is a new waring message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "waring").build());
    outputMessageBinding.output().send(MessageBuilder.withPayload("[error] This is a new error message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "error").build());
    return "success";
    }

    public String sendToFanout()
    {
    for (int i = 0; i < 3; i++)
    {
    outputMessageBinding.output().send(MessageBuilder.withPayload("This is a new message" + i).build());
    }
    return "success";
    }
    }

    Inject MessageSendProvider to the message sending class to send messages.

    Step 4. Consume messages

    Create and compile the message consuming program MessageConsumer.java. You can configure multiple channels to listen on different message queues.

    @Service
    @EnableBinding(InputMessageBinding.class)
    public class MessageConsumer {

    @StreamListener(InputMessageBinding.INPUT)
    public void test(Message<string> message) throws IOException
    {
    Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
    Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    channel.basicAck(deliveryTag, false);
    String payload = message.getPayload();
    System.out.println(payload);
    }
    }

    Step 5. View messages

    If you want to confirm whether the messages have been successfully sent to TDMQ for RabbitMQ, you can view the status of connected consumers on the Cluster> Queue page in the console.

    img

    Note:

    Above is a sample based on the pub/sub pattern of RabbitMQ, which can be configured as needed. For more information, see Demo or Rabbit Producer Properties.

    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support