Release Notes
Announcements
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2021.0.4.0</version></dependency>
spring:cloud:stream:rocketmq:binder:# Full service address.name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876# Role name.secret-key: admin# Role token.access-key: eyJrZXlJZ...# Full namespace name.namespace: rocketmq-xxx|namespace1# producer groupgroup: producerGroupbindings:# Channel name, corresponding to the channel name under spring.cloud.stream.bindings.Topic-TAG1-Input:consumer:# Type of the tag subscribed to, which is configured based on the actual consumer situation. (By default, all messages are subscribed to.)subscription: TAG1# Channel name.Topic-TAG2-Input:consumer:subscription: TAG2bindings:# Channel name.Topic-send-Output:# Specify the topic, corresponding to the created topic name.destination: TopicTestcontent-type: application/json# Channel name.Topic-TAG1-Input:destination: TopicTestcontent-type: application/jsongroup: consumer-group1# Channel name.Topic-TAG2-Input:destination: TopicTestcontent-type: application/jsongroup: consumer-group2
2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 and later versions support namespace configuration. If other versions are used, you need to concatenate the topic and group names.subscription for 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2, and is tags for other earlier versions.spring:cloud:stream:rocketmq:bindings:# Channel name, corresponding to the channel name under spring.cloud.stream.bindings.Topic-test1:consumer:# Type of the tag subscribed to, which is configured based on the actual consumer situation. (By default, all messages are subscribed to.)tags: TAG1# Channel name.Topic-test2:consumer:tags: TAG2binder:# Full service address.name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876# Role name.secret-key: admin# Role token.access-key: eyJrZXlJZ...bindings:# Channel name.Topic-send:# Specify the topic, corresponding to the full name of the created topic. The format is cluster id|namespace name%topic name.destination: rocketmq-xxx|stream%topic1content-type: application/json# Use the full group name, which must be in the format of cluster id|namespace name%group name.group: rocketmq-xxx|stream%group1# Channel name.Topic-test1:destination: rocketmq-xxx|stream%topic1content-type: application/jsongroup: rocketmq-xxx|stream%group1# Channel name.Topic-test2:destination: rocketmq-xxx|stream%topic1content-type: application/jsongroup: rocketmq-xxx|stream%group2
Parameter | Description |
name-server | Cluster access address. You can obtain it from Access Address in the operation column on the Cluster Management page in the console. Namespace access point addresses of the shared and exclusive clusters of the new version can be obtained from the namespace list. |
secret-key | Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console. |
access-key | Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console. |
namespace | Namespace name. You can copy the name from the Namespace page in the console. If you are using a 4.x general cluster or a 5.x cluster, specify the cluster ID for this parameter. |
group | Producer group name. You can copy the name from the Group Management page in the console. |
destination | Topic name. You can copy the name from the Topic Management page in the console. |
/*** Custom channel binder.*/public interface CustomChannelBinder {/*** Send messages (message producer).* Bind the channel name in the configuration.*/@Output("Topic-send-Output")MessageChannel sendChannel();/*** Receive message 1 (consumer 1).* Bind the channel name in the configuration.*/@Input("Topic-TAG1-Input")MessageChannel testInputChannel1();/*** Receive message 2 (consumer 2).* Bind the channel name in the configuration.*/@Input("Topic-TAG2-Input")MessageChannel testInputChannel2();}
@EnableBinding({CustomChannelBinder.class})
CustomChannelBinder into the class where messages are to be sent.@Autowiredprivate CustomChannelBinder channelBinder;
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();channelBinder.sendChannel().send(message);
@Servicepublic class StreamConsumer {private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);/*** Listening channel (channel name in the configuration).** @param messageBody Message content.*/@StreamListener("Topic-TAG1-Input")public void receive(String messageBody) {logger.info("Receive1: Messages received through streams, messageBody = {}", messageBody);}/*** Listening channel (channel name in the configuration).** @param messageBody Message content.*/@StreamListener("Topic-TAG2-Input")public void receive2(String messageBody) {logger.info("Receive2: Messages received through streams, messageBody = {}", messageBody);}}
http://localhost:8080/test-simple through a browser to see that the sending was successful. Observe the output logs in the integrated development environment (IDE).2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: Message sent through stream, messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: Message received through streams, messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}
Apakah halaman ini membantu?
Anda juga dapat Menghubungi Penjualan atau Mengirimkan Tiket untuk meminta bantuan.
masukan