tencent cloud

消息队列 Pulsar 版

动态与公告
新功能发布记录
集群版本更新记录
产品公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 Pulsar 版
产品优势
应用场景
技术原理
产品系列
开源 Pulsar 版本支持说明
与开源 Pulsar 对比
高可用
配额与限制
基础概念
产品计费
计费概述
价格说明
计费示例
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
使用 SDK 收发普通消息
使用 SDK 收发高级特性消息
用户指南
使用流程指引
配置账号权限
新建集群
配置命名空间
配置 Topic
连接集群
管理集群
查询消息及轨迹
跨地域复制
查看监控和配置告警
实践教程
客户端使用实践
异常消费者隔离
限流机制说明
交易对账
消息幂等性
消息压缩
迁移指南
单写多读集群迁移方案
虚拟集群平滑迁移至专业集群
API 参考
API 概览
SDK 参考
SDK 概述
SDK 配置参数推荐
TCP 协议(Pulsar 社区版)
安全与合规
权限管理
删除保护
云 API 审计
常见问题
监控相关
客户端相关
服务协议
服务等级协议
TDMQ 政策
联系我们
词汇表

消息重试与死信机制

PDF
聚焦模式
字号
最后更新时间: 2025-12-24 15:24:31
消息场景下,经常会发生消息发送失败、消息堆积超过预期等消息未被正常消费的场景。为应对这些场景,TDMQ Pulsar 版提供了消息重试和死信机制。

自动重试

自动重试 Topic 是一种为了确保消息被正常消费而设计的 Topic 。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。
当消息进入到死信队列中,表示 TDMQ Pulsar 版已经无法自动处理这批消息,一般这时就需要人为介入来处理这批消息。您可以通过编写专门的客户端来订阅死信 Topic,处理这批之前处理失败的消息。

相关概念

重试 Topic:一个重试 Topic 对应一个订阅名(一个订阅者组的唯一标识),以 Topic 形式存在于 TDMQ Pulsar 版中。当您在控制台新建订阅,并打开自动创建重试&死信队列,系统会自动创建重试 Topic,该 Topic 会自主实现消息重试的机制。
该 Topic 命名为:
2.9.2 版本集群:[Topic 名称]-[订阅名]-RETRY
2.7.2 版本集群:[订阅名]-RETRY
2.6.1 版本集群:[订阅名]-retry

实现原理

您创建的消费者使用某个订阅名以共享模式订阅了一个 Topic 后,如果开启了 enableRetry 属性,就会自动订阅这个订阅名对应的重试队列。
当消费失败,调用consumer.reconsumeLater接口之后,客户端内部检查消息对应的重试次数,如果达到指定的最大重试次数,消息被投递到死信队列(投递到死信队列的消息不会自动消费,如果需要,用户自己创建额外的消费者进行消费);如果没有达到最大重试次数,消费被投递到重试队列。重试间隔是通过延迟消息实现的,投递到重试队列的实际上是一个延迟消息,延迟时间就是用户在reconsumeLater中指定的时间。
说明
仅共享模式(包括 Key 共享)支持自动重试和死信机制。
如果订阅模式是 Exclusive 或 Failover,指定的重试的时间间隔无效,会立即重试。本质是因为重试间隔是通过延迟消息功能实现的,但是Exclusive 或 Failover 模式下面不支持延迟消息。
注意客户端版本需要与集群版本保持一致,客户端才能准确识别自动创建出的重试、死信队列。
当使用 Token 访问重试/死信队列时,需要为消费者所使用角色赋予生产消息权限。
这里以 Java 语言客户端为例,在 topic1 创建了一个 sub1 的订阅,客户端使用 sub1 订阅名订阅了 topic1 并开启了 enableRetry,如下所示:
Consumer consumer = client.newConsumer()
.topic("persistent://1******30/my-ns/topic1")
.subscriptionType(SubscriptionType.Shared)//仅共享消费模式支持重试和死信
.enableRetry(true)
.subscriptionName("sub1")
.subscribe();
此时,topic1sub1 的订阅就形成了带有重试机制的投递模式,sub1 会自动订阅之前在新建订阅时自动创建的重试 Topic(可以在控制台 Topic 列表中找到)。当 topic1 中的消息投递第一次未收到消费端 ACK 时,这条消息就会被自动投递到重试 Topic ,并且由于 consumer 自动订阅了这个主题,后续这条消息会在一定的 重试规则 下重新被消费。当达到最大重试次数后仍失败,消息会被投递到对应的死信队列,等待人工处理。
说明
如果是 client 端自动创建的订阅,可以通过控制台上的 Topic 管理 > 更多 > 查看订阅进入消费管理页面手动重建重试和死信队列。



自定义参数设置

TDMQ Pulsar 版会默认配置一套重试和死信参数,具体如下:
2.9.2 版本集群
2.7.2 版本集群
2.6.1 版本集群
指定重试次数为16次(失败16次后,第17次会投递到死信队列)
指定重试队列为[Topic 名称]-[订阅名]-RETRY
指定死信队列为[Topic 名称]-[订阅名]-DLQ
指定重试次数为16次(失败16次后,第17次会投递到死信队列)
指定重试队列为 [订阅名]-RETRY
指定死信队列为 [订阅名]-DLQ
指定重试次数为16次(失败16次后,第17次会投递到死信队列)
指定重试队列为[订阅名]-retry
指定死信队列为[订阅名]-dlq
如果希望自定义配置这些参数,可以使用 deadLetterPolicy API 进行配置,代码如下:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)//开启重试消费
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)//可以指定最大重试次数
.retryLetterTopic("persistent://my-property/my-ns/sub1-retry")//可以指定重试队列
.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//可以指定死信队列
.build())
.subscribe();

重试规则

重试规则由 reconsumerLater API 实现,有三种模式:
//指定任意延迟时间
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
//指定延迟等级
consumer.reconsumeLater(msg, 1);
//等级递增
consumer.reconsumeLater(msg);
第一种:指定任意延迟时间。第二个参数填写延迟时间,第三个参数指定时间单位。延迟时间和延时消息的取值范围一致,范围在1 - 864000(单位:秒)。
第二种:指定任意延迟等级(仅限存量腾讯云版SDK的用户使用)。实现效果和第一种基本一致,更方便统一管理分布式系统中的延时时长,延迟等级说明如下:
1.1 reconsumeLater(msg, 1)中的第二个参数即为消息等级。
1.2 默认MESSAGE_DELAYLEVEL = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",这个常数决定了每级对应的延迟时间,例如1级对应1s,3级对应10s。如果默认值不符合实际业务需求,用户可以重新自定义。
第三种:等级递增(仅限存量腾讯云版 SDK 的用户使用)。实现的效果不同于以上两种,为退避式的重试,即第一次失败后重试间隔为1秒,第二次失败后重试间隔为5秒,以此类推,次数越多,间隔时间越长。具体时间间隔同样由第二种中介绍的 MESSAGE_DELAYLEVEL 决定。 这种重试机制往往在业务场景中有更实际的应用,如果消费失败,一般服务不会立刻恢复,使用这种渐进式的重试方式更为合理。
注意:
如果您使用的是 Pulsar 社区的 SDK,则不支持延迟等级和等级递增两种模式。

重试消息的消息属性

一条重试消息会给消息带上如下 property。
{
REAL_TOPIC="persistent://my-property/my-ns/test,
ORIGIN_MESSAGE_ID=314:28:-1,
RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry,
RECONSUMETIMES=16
}
REAL_TOPIC:原 Topic。
ORIGIN_MESSAGE_ID:最初生产的消息 ID。
RETRY_TOPIC:重试 Topic。
RECONSUMETIMES:代表该消息重试的次数。

重试次数的获取方法

msg.getProperties().get("RECONSUMETIMES")
注意:
通过 msg.getRedeliveryCount() 接口获取到的是 negativeAcknowledge 重试方式下的重试次数。

重试消息的消息 ID 流转

消息 ID 流转过程如下所示,您可以借助此规则对相关日志进行分析。
原始消费: msgid=1:1:0:1
第一次重试: msgid=2:1:-1
第二次重试: msgid=2:2:-1
第三次重试: msgid=2:3:-1
.......
第16次重试: msgid=2:16:0:1
第17次写入死信队列: msgid=3:1:-1

完整代码示例

重试(-RETRY)Topic 需要在 Consumer 中首先开启该功能(enableRetry(true)),默认为关闭状态。之后需要调用 reconsumeLater() 的接口消息才会被发送到重试 Topic 中。
死信(-DLQ)Topic 需要调用  consumer.reconsumeLater(),执行 reconsumeLater 之后原 topic 的那条消息会被 ack,消息转存到 retry topic,重试到达上限后消息转存至死信。Pulsar Client 会自动订阅 retry topic,但是进入死信队列就不会自动订阅,需要用户自己来订阅。
以下为借助 TDMQ Pulsar 版实现完整消息重试机制的代码示例,供开发者参考。
订阅主题
Consumer<byte[]> consumer1 = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)//开启重试消费
//.deadLetterPolicy(DeadLetterPolicy.builder()
// .maxRedeliverCount(maxRedeliveryCount)
// .retryLetterTopic("persistent://my-property/my-ns/my-subscription-retry")//可以指定重试队列
// .deadLetterTopic("persistent://my-property/my-ns/my-subscription-dlq")//可以指定死信队列
// .build())
.subscribe();
执行消费
while (true) {
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// select reconsume policy
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
//consumer.reconsumeLater(msg, 1);
//consumer.reconsumeLater(msg);
}
}

主动重试

当客户端消费某条消息失败,如果想重新消费到这条消息,消费者可以调用 negativeAcknowledge 接口。消息会在一段时间后重新被获取到,重新获取消息的间隔时间,可以通过 consumer 配置 negativeAckRedeliveryDelay 指定。

实现机制简述

客户端会缓存 negativeAcknowledge 的消息 id,客户端内部定期扫描 negativeAcknowledge 消息的列表(扫描间隔 negativeAckRedeliveryDelay * 1/3)。当有到达 negativeAckRedeliveryDelay 指定时间的消息之后,客户端通知服务端重新投递对应需要重试的消息,服务端接收到客户端的重新投递请求之后,重新推送对应的消息到客户端。
注意:
1. 实际重新接收到消息的时间可能会比 negativeAckRedeliveryDelay 指定的时间多 1/3,这里和客户端的实现逻辑相关。
2. 这种方式下,并没有产生新消息。
以下为主动重试的 Java 代码示例:
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
// 默认1min
.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES)
.subscribe();


while (true) {
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}

注意事项

1. negativeAcknowledge 的重试方式下,对应需要重试的消息在服务端看依然是 unack 的消息。
2. negativeAcknowledge 的重试方式下,默认没有最大重试次数。但是,可以通过配置最大重试次数和死信队列的方式实现。这种方式下,当一条消息重试了指定次数之后,会被投递到死信队列中。
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
// 默认1min
.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(5)//可以指定最大重试次数
.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//可以指定死信队列
.build())
.subscribe();


while (true) {
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
3. negativeAcknowledge 的重试方式下,重试次数可以从 msg.getRedeliveryCount() 中获取。但是要注意,如果一个订阅下的消费者全部离线,那么消息的重试次数会被重置到 0(典型的场景:一个订阅下只有一个消费者,那么消费者重启之后,消息的重试次数都会重置为 0 )。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈