tencent cloud

文档反馈

发送与接收事务消息

最后更新时间:2023-11-24 14:30:01

    操作场景

    本文以调用 Java SDK 为例介绍通过开源 SDK 实现事务消息收发的操作过程。

    前提条件

    完成资源创建与准备(如果是全局顺序消息,需要创建单队列topic)

    操作步骤

    步骤1:安装 Java 依赖库

    在 Java 项目中引入相关依赖,以 maven 工程为例,在 pom.xml 添加以下依赖:
    说明
    依赖版本要求 ≥ 4.9.3, 当前建议为4.9.4
    <!-- in your <dependencies> block -->
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
    </dependency>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.9.4</version>
    </dependency>

    步骤2:生产消息

    实现 TransactionListener

    public class TransactionListenerImpl implements TransactionListener {
    
    //半消息发送成功后,回调该方法执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    //这里执行数据库事务,如果成功,就返回成功,否则返回未知,或者回滚,等待回查
    return LocalTransactionState.UNKNOW;
    }
    //回查本地事务
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    //这里查询本地db的数据状态,然后决定是否是否提交
    return LocalTransactionState.COMMIT_MESSAGE;
    }
    
    }

    创建消息生产者

    //需要用户实现一个TransactionListener 实例,
    TransactionListener transactionListener = new TransactionListenerImpl();
    // 实例化事务消息生产者
    ProducerTransactionMQProducer producer = new TransactionMQProducer("transaction_group",
    // ACL权限
    new AclClientRPCHook(new SessionCredentials(ClientCreater.ACCESS_KEY, ClientCreater.SECRET_KEY)));
    // 设置NameServer的地址
    producer.setNamesrvAddr(ClientCreater.NAMESERVER);
    producer.setTransactionListener(transactionListener);
    producer.start();
    参数
    说明
    groupName
    生产者组名称,建议使用对应的topic名字
    nameserver
    集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。
    secretKey
    角色名称,在 角色管理 页面复制。
    accessKey
    角色密钥,在 角色管理 页面复制密钥列复制。
    

    发送消息

    for (int i = 0; i < 3; i++) {
    // 构造消息示例
    Message msg = new Message(TOPIC_NAME, "your tag", "KEY" + i,("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));
    SendResult sendResult = producer.sendMessageInTransaction(msg,null);
    System.out.printf("%s%n", sendResult);
    }

    步骤3:消费消息

    创建消费者

    TDMQ RocketMQ 版支持 push 和 pull 两种消费模式。推荐Push消费模式
    // 实例化消费者
    DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(
    groupName,
    new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限
    // 设置NameServer的地址
    pushConsumer.setNamesrvAddr(nameserver);
    pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    // 消息处理逻辑
    System.out.printf("%s Receive transaction messages: %s %n", Thread.currentThread().getName(), msgs);
    // 标记该消息已经被成功消费
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    参数
    说明
    groupName
    生产者组名称,在控制台集群管理中Group 页签中复制。
    nameserver
    集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。
    secretKey
    角色名称,在 角色管理 页面复制。
    accessKey
    角色密钥,在 角色管理 页面复制密钥列复制。
    

    订阅消息

    根据消费模式不同,订阅方式也有所区别。
    // 订阅topic
    pushConsumer.subscribe(topic_name, "*");
    // 注册回调实现类来处理从broker拉取回来的消息
    pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    // 消息处理逻辑
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    // 标记该消息已经被成功消费, 根据消费情况,返回处理状态
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    // 启动消费者实例
    pushConsumer.start();

    步骤4:查看消费详情

    登录 TDMQ 控制台,在集群管理 > Group 页面,可查看与 Group 连接的客户端列表,单击操作列的查看消费者详情,可查看消费者详情。
    
    
    
    
    
    
    说明
    上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持