tencent cloud

TDMQ for RocketMQ

Release Notes and Announcements
Release Notes
Announcements
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for RocketMQ
Strengths
Scenarios
Product Series
Comparison with Open-Source RocketMQ
High Availability
Quotas and Limits
Supported Regions
Basic Concepts
Billing
Billing Overview
Pricing
Billing Examples
Pay-as-you-go Switch to Monthly Subscription (5.x)
Renewal
Viewing Consumption Details
Refund
Overdue Payments
Getting Started
Getting Started Guide
Preparations
Step 1: Creating TDMQ for RocketMQ Resources
Step 2: Using the SDK to Send and Receive Messages (Recommended)
Step 2: Running the TDMQ for RocketMQ Client (Optional)
Step 3: Querying Messages
Step 4: Deleting Resources
User Guide
Usage Process Guide
Configuring Account Permissions
Creating the Cluster
Configuring the Namespace
Configuring the Topic
Configuring the Group
Connecting to the Cluster
Managing Messages
Managing the Cluster
Viewing Monitoring Data and Configuring Alarms
Cross-Cluster Message Replication
Use Cases
Naming Conventions for Common Concepts of TDMQ for RocketMQ
RocketMQ Client Use Cases
RocketMQ Performance Load Testing and Capacity Assessment
Access over HTTP
Client Risk Descriptions and Update Guide
Migration Guide for TencentCloud API Operations Related to RocketMQ 4.x Cluster Roles
Migration Guide
Disruptive Migration
Seamless Migration
Developer Guide
Message Types
Message Filtering
Message Retries
POP Consumption Mode (5.x)
Clustering Consumption and Broadcasting Consumption
Subscription Relationship Consistency
Traffic Throttling
​​API Reference(5.x)
History
API Category
Making API Requests
Topic APIs
Consumer Group APIs
Message APIs
Role Authentication APIs
Hitless Migration APIs
Cloud Migration APIs
Cluster APIs
Data Types
Error Codes
​​API Reference(4.x)
SDK Reference
SDK Overview
5.x SDK
4.x SDK
Security and Compliance
Permission Management
CloudAudit
Deletion Protection
FAQs
4.x Instance FAQs
Agreements
TDMQ for RocketMQ Service Level Agreement
Contact Us

Spring Cloud Stream Integration

PDF
フォーカスモード
フォントサイズ
最終更新日: 2026-01-23 17:52:24

Scenarios

This document uses Spring Cloud Stream integration as an example to describe how to send and receive messages, helping you better understand the complete process of sending and receiving messages.

Prerequisites

You have obtained the client connection parameters as instructed in SDK Overview.

Operation Steps

Step 1: Importing Dependencies

Add the spring-cloud-starter-stream-rocketmq dependencies to pom.xml. Version 2021.0.4.0 is recommended.
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2021.0.4.0</version>
</dependency>

Step 2: Adding Configurations

Add configurations related to TDMQ for RocketMQ to the configuration file.
spring:
cloud:
stream:
rocketmq:
binder:
# Full service address.
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
# Role name.
secret-key: admin
# Role token.
access-key: eyJrZXlJZ...
# Full namespace name.
namespace: rocketmq-xxx|namespace1
# producer group
group: producerGroup
bindings:
# Channel name, corresponding to the channel name under spring.cloud.stream.bindings.
Topic-TAG1-Input:
consumer:
# Type of the tag subscribed to, which is configured based on the actual consumer situation. (By default, all messages are subscribed to.)
subscription: TAG1
# Channel name.
Topic-TAG2-Input:
consumer:
subscription: TAG2
bindings:
# Channel name.
Topic-send-Output:
# Specify the topic, corresponding to the created topic name.
destination: TopicTest
content-type: application/json
# Channel name.
Topic-TAG1-Input:
destination: TopicTest
content-type: application/json
group: consumer-group1
# Channel name.
Topic-TAG2-Input:
destination: TopicTest
content-type: application/json
group: consumer-group2
Note
1. Only 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 and later versions support namespace configuration. If other versions are used, you need to concatenate the topic and group names.
The format is as follows:
rocketmq-pngrpmk94d5o|stream%topic (format: full namespace name%topic name)
rocketmq-pngrpmk94d5o|stream%group (format: full namespace name%group name)
The format for new virtual clusters and exclusive clusters is as follows:
MQ_INST_rocketmqpj79obd2ew7v_test%topic (format: full namespace name%topic name)
MQ_INST_rocketmqpj79obd2ew7v_test%group (format: full namespace name%group name)
2. The subscription configuration item is subscription for 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2, and is tags for other earlier versions.
For other versions, see the following full configuration items:
spring:
cloud:
stream:
rocketmq:
bindings:
# Channel name, corresponding to the channel name under spring.cloud.stream.bindings.
Topic-test1:
consumer:
# Type of the tag subscribed to, which is configured based on the actual consumer situation. (By default, all messages are subscribed to.)
tags: TAG1
# Channel name.
Topic-test2:
consumer:
tags: TAG2
binder:
# Full service address.
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
# Role name.
secret-key: admin
# Role token.
access-key: eyJrZXlJZ...
bindings:
# Channel name.
Topic-send:
# Specify the topic, corresponding to the full name of the created topic. The format is cluster id|namespace name%topic name.
destination: rocketmq-xxx|stream%topic1
content-type: application/json
# Use the full group name, which must be in the format of cluster id|namespace name%group name.
group: rocketmq-xxx|stream%group1
# Channel name.
Topic-test1:
destination: rocketmq-xxx|stream%topic1
content-type: application/json
group: rocketmq-xxx|stream%group1
# Channel name.
Topic-test2:
destination: rocketmq-xxx|stream%topic1
content-type: application/json
group: rocketmq-xxx|stream%group2
Parameter
Description
name-server
Cluster access address. You can obtain it from Access Address in the operation column on the Cluster Management page in the console. Namespace access point addresses of the shared and exclusive clusters of the new version can be obtained from the namespace list.
secret-key
Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console.
access-key
Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console.
namespace
Namespace name. You can copy the name from the Namespace page in the console. If you are using a 4.x general cluster or a 5.x cluster, specify the cluster ID for this parameter.
group
Producer group name. You can copy the name from the Group Management page in the console.
destination
Topic name. You can copy the name from the Topic Management page in the console.

Step 3: Configuring Channels

Channels are classified into input and output channels, which can be individually configured based on business requirements.
/**
* Custom channel binder.
*/
public interface CustomChannelBinder {

/**
* Send messages (message producer).
* Bind the channel name in the configuration.
*/
@Output("Topic-send-Output")
MessageChannel sendChannel();


/**
* Receive message 1 (consumer 1).
* Bind the channel name in the configuration.
*/
@Input("Topic-TAG1-Input")
MessageChannel testInputChannel1();

/**
* Receive message 2 (consumer 2).
* Bind the channel name in the configuration.
*/
@Input("Topic-TAG2-Input")
MessageChannel testInputChannel2();
}


Step 4: Adding Annotations

Add annotations on the configuration class or startup class. If multiple binder configurations exist, they must be specified in the annotation.
@EnableBinding({CustomChannelBinder.class})

Step 5: Sending Messages

1. Add CustomChannelBinder into the class where messages are to be sent.
@Autowired
private CustomChannelBinder channelBinder;
2. Send a message by calling the output stream channel.
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
channelBinder.sendChannel().send(message);

Step 6: Consuming Messages

@Service
public class StreamConsumer {
private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);

/**
* Listening channel (channel name in the configuration).
*
* @param messageBody Message content.
*/
@StreamListener("Topic-TAG1-Input")
public void receive(String messageBody) {
logger.info("Receive1: Messages received through streams, messageBody = {}", messageBody);
}

/**
* Listening channel (channel name in the configuration).
*
* @param messageBody Message content.
*/
@StreamListener("Topic-TAG2-Input")
public void receive2(String messageBody) {
logger.info("Receive2: Messages received through streams, messageBody = {}", messageBody);
}
}

Step 7: Local Testing

After the project is started locally, you can see the successful startup from the console.
Visit http://localhost:8080/test-simple through a browser to see that the sending was successful. Observe the output logs in the integrated development environment (IDE).
2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: Message sent through stream, messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]
2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: Message received through streams, messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}

You can see that a message tagged TAG1 was sent, and only subscribers of TAG1 received it.
Note
For specific usage, see Demo or visit the Spring Cloud Stream official website.


ヘルプとサポート

この記事はお役に立ちましたか?

フィードバック