tencent cloud

文档反馈

使用 5.0 SDK 收发普通消息

最后更新时间:2024-01-17 17:45:14

    操作场景

    本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
    说明
    以 Java 客户端为例说明,其他语言客户端请参见 SDK 文档

    前提条件

    操作步骤

    步骤1:安装 Java 依赖库

    在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
    <dependencies>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.5</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.7</version>
    </dependency>
    </dependencies>

    步骤2:生产消息

    public class NormalMessageSyncProducer {
    private static final Logger log = LoggerFactory.getLogger(NormalMessageSyncProducer.class);
    
    private NormalMessageSyncProducer() {
    }
    
    public static void main(String[] args) throws ClientException, IOException {
    final ClientServiceProvider provider = ClientServiceProvider.loadService();
    
    // 添加配置的ak和sk
    String accessKey = "yourAccessKey"; //ak
    String secretKey = "yourSecretKey"; //sk
    SessionCredentialsProvider sessionCredentialsProvider =
    new StaticSessionCredentialsProvider(accessKey, secretKey);
    
    // 填写腾讯云提供的接入地址
    String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .enableSsl(false)
    .setCredentialProvider(sessionCredentialsProvider)
    .build();
    String topic = "yourNormalTopic";
    // In most case, you don't need to create too many producers, singleton pattern is recommended.
    final Producer producer = provider.newProducerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic
    // route before message publishing.
    .setTopics(topic)
    // May throw {@link ClientException} if the producer is not initialized.
    .build();
    // Define your message body.
    byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
    String tag = "yourMessageTagA";
    final Message message = provider.newMessageBuilder()
    // Set topic for the current message.
    .setTopic(topic)
    // Message secondary classifier of message besides topic.
    .setTag(tag)
    // Key(s) of the message, another way to mark message besides message id.
    .setKeys("yourMessageKey-1c151062f96e")
    .setBody(body)
    .build();
    try {
    final SendReceipt sendReceipt = producer.send(message);
    log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    } catch (Throwable t) {
    log.error("Failed to send message", t);
    }
    // Close the producer when you don't need it anymore.
    producer.close();
    }
    }

    步骤3:消费消息

    腾讯云消息队列 TDMQ RocketMQ 版 5.x 系列支持两种消费模式,分别为 Push Consumer 和 Simple Consumer,以下代码示例以 Push Consumer 为例:
    public class NormalPushConsumer {
    private static final Logger log = LoggerFactory.getLogger(NormalPushConsumer.class);
    
    private NormalPushConsumer() {
    }
    
    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
    final ClientServiceProvider provider = ClientServiceProvider.loadService();
    
    // 添加配置的ak和sk
    String accessKey = "yourAccessKey"; //ak
    String secretKey = "yourSecretKey"; //sk
    SessionCredentialsProvider sessionCredentialsProvider =
    new StaticSessionCredentialsProvider(accessKey, secretKey);
    
    // 填写腾讯云提供的接入地址
    String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .enableSsl(false)
    .setCredentialProvider(sessionCredentialsProvider)
    .build();
    String tag = "*";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();
    // Block the main thread, no need for production environment.
    Thread.sleep(Long.MAX_VALUE);
    // Close the push consumer when you don't need it anymore.
    pushConsumer.close();
    }
    }

    步骤4:查看消息详情

    发送完成消息后会得到一个消息ID (messageID),开发者可以在 “消息查询” 页面查询刚刚发送的消息,如下图所示;并且可以查看特定消息的详情和轨迹等信息,详情见 消息查询 章节。
    
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持