<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2021.0.4.0</version></dependency>
spring:cloud:stream:rocketmq:binder:# 服务地址全称name-server: rmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080# 角色名称secret-key: admin# 角色密钥access-key: eyJrZXlJZ...# producer groupgroup: producerGroupbindings:# channel名称, 与spring.cloud.stream.bindings下的channel名称对应Topic-TAG1-Input:consumer:# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)subscription: TAG1# channel名称Topic-TAG2-Input:consumer:subscription: TAG2bindings:# channel名称Topic-send-Output:# 指定topic, 对应创建的topic名称destination: TopicTestcontent-type: application/json# channel名称Topic-TAG1-Input:destination: TopicTestcontent-type: application/jsongroup: consumer-group1# channel名称Topic-TAG2-Input:destination: TopicTestcontent-type: application/jsongroup: consumer-group2
2.2.5-RocketMQ-RC1与 2.2.5.RocketMQ.RC2 及以上版本支持 namespace 配置,如使用别的版本需要对 topic 和 group 名称进行拼接。2.2.5-RocketMQ-RC1与 2.2.5.RocketMQ.RC2 的订阅配置项为 subscription , 其他低版本订阅配置项为 tags。spring:cloud:stream:rocketmq:bindings:# channel名称, 与spring.cloud.stream.bindings下的channel名称对应Topic-test1:consumer:# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)tags: TAG1# channel名称Topic-test2:consumer:tags: TAG2binder:# 服务地址全称name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080# 角色名称secret-key: admin# 角色密钥access-key: eyJrZXlJZ...bindings:# channel名称Topic-send:# 指定topic,destination: topic1content-type: application/json# 要使用group全称group: group1# channel名称Topic-test1:destination: topic1content-type: application/jsongroup: group1# channel名称Topic-test2:destination: topic1content-type: application/jsongroup: group2
参数 | 说明 |
name-server | 集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。 |
secret-key | 角色名称,在控制台的角色管理页面 SecretKey 列复制。 |
access-key | 角色密钥,在控制台的角色管理页面 AccessKey 列复制。 ![]() |
namespace | 命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。 ![]() |
group | 生产者组名称,在控制台 Group 管理页面复制。 ![]() |
destination | Topic 的名称,在控制台 Topic 管理页面复制。 ![]() |
/*** 自定义通道 Binder*/public interface CustomChannelBinder {/*** 发送消息(消息生产者)* 绑定配置中的channel名称*/@Output("Topic-send-Output")MessageChannel sendChannel();/*** 接收消息1(消费者1)* 绑定配置中的channel名称*/@Input("Topic-TAG1-Input")MessageChannel testInputChannel1();/*** 接收消息2(消费者2)* 绑定配置中的channel名称*/@Input("Topic-TAG2-Input")MessageChannel testInputChannel2();}
@EnableBinding({CustomChannelBinder.class})
CustomChannelBinder。@Autowiredprivate CustomChannelBinder channelBinder;
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();channelBinder.sendChannel().send(message);
@Servicepublic class StreamConsumer {private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);/*** 监听channel (配置中的channel 名称)** @param messageBody 消息内容*/@StreamListener("Topic-TAG1-Input")public void receive(String messageBody) {logger.info("Receive1: 通过stream收到消息,messageBody = {}", messageBody);}/*** 监听channel (配置中的channel 名称)** @param messageBody 消息内容*/@StreamListener("Topic-TAG2-Input")public void receive2(String messageBody) {logger.info("Receive2: 通过stream收到消息,messageBody = {}", messageBody);}}
http://localhost:8080/test-simple可以看到发送成功。观察开发 IDE 的输出日志。2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: 通过stream发送消息,messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: 通过stream收到消息,messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}
文档反馈