tencent cloud

5.x 集群使用 5.x SDK 收发普通消息
最后更新时间:2025-07-23 14:09:42
5.x 集群使用 5.x SDK 收发普通消息
最后更新时间: 2025-07-23 14:09:42

操作场景

消息队列 RocketMQ 版支持多种语言的 SDK 收发不同类型的消息,本文以调用 Java SDK 为例介绍通过 5.x SDK 连接消息队列 RocketMQ 版服务端实现普通消息收发的操作过程。

前提条件

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>

步骤2:生产消息

在已创建的 Java 工程中,创建发送普通消息程序并运行。
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.example.AsyncProducerExample;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NormalMessageSyncProducer {
private static final Logger log = LoggerFactory.getLogger(NormalMessageSyncProducer.class);

private NormalMessageSyncProducer() {
}

public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// 添加配置的ak和sk
String accessKey = "yourAccessKey"; //ak
String secretKey = "yourSecretKey"; //sk
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

// 填写腾讯云提供的接入地址
String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String topic = "yourNormalTopic";
// 通常在一个客户端内无需创建过多的生产者。
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置主题名,此处的设置非必须,但是推荐设置,以便生产者可以在正式发送消息前,预先抓取消息路由。
.setTopics(topic)
// 如生产者未初始化可能会报 M {@link ClientException} 的错误。
.build();
// 此处定义消息主体。
byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
// Set topic for the current message.
.setTopic(topic)
// 在 topic 下进行的消息二级分类,区别同一个主题内不同的消息。
.setTag(tag)
// 消息键,除消息 ID 外可以区别不同消息的其他途径。
.setKeys("yourMessageKey-1c151062f96e")
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
}
// 发送完成后,如无别的需要可以关闭生产者客户端。
producer.close();
}
}
参数
说明
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
endpoints
集群接入地址,在控制台集群基本信息页面的接入信息模块获取。

topic
Topic 的名称,在控制台 Topic 管理页面复制。


步骤3:消费消息

在已创建的 Java 工程中,创建订阅普通消息程序并运行。
TDMQ RocketMQ 5.x 系列支持两种消费模式,分别为 Push Consumer 和 Simple Consumer,以下代码示例以 Push Consumer 为例。
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NormalPushConsumer {
private static final Logger log = LoggerFactory.getLogger(NormalPushConsumer.class);

private NormalPushConsumer() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// 添加配置的 ak 和 sk
String accessKey = "yourAccessKey"; //ak
String secretKey = "yourSecretKey"; //sk
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

// 填写腾讯云提供的接入地址
String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
// 通常在一个客户端内无需创建过多的消费者。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者组名称。
.setConsumerGroup(consumerGroup)
// 设置消费者订阅名称
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// 处理消息并返回消息消费结果。
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
// 生产环境无需阻塞主线程。
Thread.sleep(Long.MAX_VALUE);
// 消费完成后,如无别的需要可以关闭消费者客户端。
pushConsumer.close();
}
}
参数
说明
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
endpoints
集群接入地址,在控制台集群基本信息页面的接入信息模块获取。

consumerGroup
消费者组名称,在控制台 Group 管理页面复制。

topic
Topic 的名称,在控制台 Topic 管理页面复制。


步骤4. 查看消息详情

发送完成消息后会得到一个消息 ID (messageID),您可以在控制台的消息查询 > 综合查询页面查询刚刚发送的消息,以及该消息的详情和轨迹等信息。


本页内容是否解决了您的问题?
您也可以 联系销售 提交工单 以寻求帮助。

文档反馈