<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.5</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.7</version></dependency></dependencies>
import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientException;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.SessionCredentialsProvider;import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;import org.apache.rocketmq.client.apis.message.Message;import org.apache.rocketmq.client.apis.producer.Producer;import org.apache.rocketmq.client.apis.producer.SendReceipt;import org.apache.rocketmq.client.java.example.AsyncProducerExample;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class NormalMessageSyncProducer {private static final Logger log = LoggerFactory.getLogger(NormalMessageSyncProducer.class);private NormalMessageSyncProducer() {}public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 添加配置的ak和skString accessKey = "yourAccessKey"; //akString secretKey = "yourSecretKey"; //skSessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 填写腾讯云提供的接入地址String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String topic = "yourNormalTopic";// 通常在一个客户端内无需创建过多的生产者。final Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration)// 设置主题名,此处的设置非必须,但是推荐设置,以便生产者可以在正式发送消息前,预先抓取消息路由。.setTopics(topic)// 如生产者未初始化可能会报 M {@link ClientException} 的错误。.build();// 此处定义消息主体。byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "yourMessageTagA";final Message message = provider.newMessageBuilder()// Set topic for the current message..setTopic(topic)// 在 topic 下进行的消息二级分类,区别同一个主题内不同的消息。.setTag(tag)// 消息键,除消息 ID 外可以区别不同消息的其他途径。.setKeys("yourMessageKey-1c151062f96e").setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.error("Failed to send message", t);}// 发送完成后,如无别的需要可以关闭生产者客户端。producer.close();}}
参数 | 说明 |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
endpoints | 集群接入地址,在控制台集群基本信息页面的接入信息模块获取。 ![]() |
topic | Topic 的名称,在控制台 Topic 管理页面复制。 ![]() |
import java.io.IOException;import java.util.Collections;import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientException;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.SessionCredentialsProvider;import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;import org.apache.rocketmq.client.apis.consumer.ConsumeResult;import org.apache.rocketmq.client.apis.consumer.FilterExpression;import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;import org.apache.rocketmq.client.apis.consumer.PushConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class NormalPushConsumer {private static final Logger log = LoggerFactory.getLogger(NormalPushConsumer.class);private NormalPushConsumer() {}public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 添加配置的 ak 和 skString accessKey = "yourAccessKey"; //akString secretKey = "yourSecretKey"; //skSessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 填写腾讯云提供的接入地址String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);String consumerGroup = "yourConsumerGroup";String topic = "yourTopic";// 通常在一个客户端内无需创建过多的消费者。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者组名称。.setConsumerGroup(consumerGroup)// 设置消费者订阅名称.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {// 处理消息并返回消息消费结果。log.info("Consume message={}", messageView);return ConsumeResult.SUCCESS;}).build();// 生产环境无需阻塞主线程。Thread.sleep(Long.MAX_VALUE);// 消费完成后,如无别的需要可以关闭消费者客户端。pushConsumer.close();}}
参数 | 说明 |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
endpoints | 集群接入地址,在控制台集群基本信息页面的接入信息模块获取。 ![]() |
consumerGroup | 消费者组名称,在控制台 Group 管理页面复制。 ![]() |
topic | Topic 的名称,在控制台 Topic 管理页面复制。 ![]() |

文档反馈