自动创建重试&死信队列,系统会自动创建重试 Topic,该 Topic 会自主实现消息重试的机制。enableRetry 属性,就会自动订阅这个订阅名对应的重试队列。consumer.reconsumeLater接口之后,客户端内部检查消息对应的重试次数,如果达到指定的最大重试次数,消费被投递到死信队列(投递到死信队列的消息不会自动消费,如果需要,用户自己创建额外的消费者进行消费);如果没有达到最大重试次数,消费被投递到重试队列。重试间隔是通过延迟消息实现的,投递到重试队列的实际上是一个延迟消息,延迟时间就是用户在reconsumeLater中指定的时间。topic1 创建了一个 sub1 的订阅,客户端使用 sub1 订阅名订阅了 topic1 并开启了 enableRetry,如下所示:Consumer consumer = client.newConsumer().topic("persistent://1******30/my-ns/topic1").subscriptionType(SubscriptionType.Shared)//仅共享消费模式支持重试和死信.enableRetry(true).subscriptionName("sub1").subscribe();
topic1 对 sub1 的订阅就形成了带有重试机制的投递模式,sub1 会自动订阅之前在新建订阅时自动创建的重试 Topic 中(可以在控制台 Topic 列表中找到)。当 topic1 中的消息投递第一次未收到消费端 ACK 时,这条消息就会被自动投递到重试 Topic ,并且由于 consumer 自动订阅了这个主题,后续这条消息会在一定的 重试规则 下重新被消费。当达到最大重试次数后仍失败,消息会被投递到对应的死信队列,等待人工处理。[Topic 名称]-[订阅名]-RETRY[Topic 名称]-[订阅名]-DLQ [订阅名]-RETRY [订阅名]-DLQ[订阅名]-retry[订阅名]-dlqdeadLetterPolicy API 进行配置,代码如下:Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://pulsar-****").subscriptionName("sub1").subscriptionType(SubscriptionType.Shared).enableRetry(true)//开启重试消费.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount)//可以指定最大重试次数.retryLetterTopic("persistent://my-property/my-ns/sub1-retry")//可以指定重试队列.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//可以指定死信队列.build()).subscribe();
reconsumerLater API 实现,有三种模式://指定任意延迟时间consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);//指定延迟等级consumer.reconsumeLater(msg, 1);//等级递增consumer.reconsumeLater(msg);
reconsumeLater(msg, 1)中的第二个参数即为消息等级。MESSAGE_DELAYLEVEL = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",这个常数决定了每级对应的延时时间,例如1级对应1s,3级对应10s。如果默认值不符合实际业务需求,用户可以重新自定义。MESSAGE_DELAYLEVEL 决定。{REAL_TOPIC="persistent://my-property/my-ns/test,ORIGIN_MESSAGE_ID=314:28:-1,RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry,RECONSUMETIMES=16}
REAL_TOPIC:原 Topic。ORIGIN_MESSAGE_ID:最初生产的消息 ID。RETRY_TOPIC:重试 Topic。RECONSUMETIMES:代表该消息重试的次数。msg.getProperties().get("RECONSUMETIMES")
原始消费: msgid=1:1:0:1第一次重试: msgid=2:1:-1第二次重试: msgid=2:2:-1第三次重试: msgid=2:3:-1.......第16次重试: msgid=2:16:0:1第17次写入死信队列: msgid=3:1:-1
Consumer<byte[]> consumer1 = client.newConsumer().topic("persistent://pulsar-****").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true)//开启重试消费//.deadLetterPolicy(DeadLetterPolicy.builder()// .maxRedeliverCount(maxRedeliveryCount)// .retryLetterTopic("persistent://my-property/my-ns/my-subscription-retry")//可以指定重试队列// .deadLetterTopic("persistent://my-property/my-ns/my-subscription-dlq")//可以指定死信队列// .build()).subscribe();
while (true) {Message msg = consumer.receive();try {// Do something with the messageSystem.out.printf("Message received: %s", new String(msg.getData()));// Acknowledge the message so that it can be deleted by the message brokerconsumer.acknowledge(msg);} catch (Exception e) {// select reconsume policyconsumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);//consumer.reconsumeLater(msg, 1);//consumer.reconsumeLater(msg);}}
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://pulsar-****").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared)// 默认1min.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES).subscribe();while (true) {Message msg = consumer.receive();try {// Do something with the messageSystem.out.printf("Message received: %s", new String(msg.getData()));// Acknowledge the message so that it can be deleted by the message brokerconsumer.acknowledge(msg);} catch (Exception e) {// Message failed to process, redeliver laterconsumer.negativeAcknowledge(msg);}}
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://pulsar-****").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared)// 默认1min.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(5)//可以指定最大重试次数.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//可以指定死信队列.build()).subscribe();while (true) {Message msg = consumer.receive();try {// Do something with the messageSystem.out.printf("Message received: %s", new String(msg.getData()));// Acknowledge the message so that it can be deleted by the message brokerconsumer.acknowledge(msg);} catch (Exception e) {// Message failed to process, redeliver laterconsumer.negativeAcknowledge(msg);}}
文档反馈