Automatically Create Retry & Dead Letter Queues, the system automatically creates a retry topic. This topic autonomously implements the message retry mechanism.enableRetry property is enabled, the retry queue corresponding to the subscription name is automatically subscribed to.consumer.reconsumeLater API is called, the client internally checks the number of message retries. If the number of message retries reaches the maximum limit, the message is delivered to the dead letter queue. (Messages delivered to the dead letter queue are not automatically consumed. If required, users create additional consumers for consumption.) If the number of message retries does not reach the maximum limit, the message is delivered to the retry queue. The retry interval is implemented through delayed messages. Actually, a delayed message is delivered to the retry queue, and the delay time is specified by the user in reconsumeLater.sub1 is created for topic1, and the client subscribes to topic1 using the subscription name sub1 and enables enableRetry, as shown in the following figure:Consumer consumer = client.newConsumer().topic("persistent://1******30/my-ns/topic1").subscriptionType(SubscriptionType.Shared)//Only the Shared mode supports the retry and dead letter mechanisms..enableRetry(true).subscriptionName("sub1").subscribe();
sub1 by topic1 forms a delivery mode with a retry mechanism. sub1 automatically subscribes to the retry topic automatically created during initial subscription creation (available in the topic list of the console). When no acknowledgment from the consumer is received for a message in topic1 for the first time, the message is automatically delivered to the retry topic. Since the consumer automatically subscribes to this topic, the message will be re-consumed based on specific retry rules. If consumption fails after the maximum number of retries is reached, the message is delivered to the corresponding dead letter queue for manual handling.[Topic name]-[Subscription name]-RETRY.[Topic name]-[Subscription name]-DLQ.[Subscription name]-RETRY.[Subscription name]-DLQ.[Subscription name]-retry.[Subscription name]-dlq.deadLetterPolicy API. The code is as follows:Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://pulsar-****").subscriptionName("sub1").subscriptionType(SubscriptionType.Shared).enableRetry(true)//Enable retry consumption..deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount)//Specify the maximum number of retries..retryLetterTopic("persistent://my-property/my-ns/sub1-retry")//Specify the retry queue..deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//Specify the dead letter queue..build()).subscribe();
reconsumeLater API. Three modes are available.//Specify any delay time.consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);//Specify the delay level.consumer.reconsumeLater(msg, 1);//Use level-based incremental increase.consumer.reconsumeLater(msg);
reconsumeLater(msg, 1) is the message level.MESSAGE_DELAYLEVEL = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h". This constant determines the delay time of each level. For example, level 1 corresponds to 1s, and level 3 corresponds to 10s. If the default value does not meet actual business requirements, users can customize it.MESSAGE_DELAYLEVEL constant introduced in the second mode.
This retry mechanism is more practical in actual business scenarios. If consumption fails, services are not restored immediately, typically, making this progressive retry method more reasonable.{REAL_TOPIC="persistent://my-property/my-ns/test,ORIGIN_MESSAGE_ID=314:28:-1,RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry,RECONSUMETIMES=16}
REAL_TOPIC: original topic.ORIGIN_MESSAGE_ID: ID of the originally produced message.RETRY_TOPIC: retry topic.RECONSUMETIMES: number of message retries.msg.getProperties().get("RECONSUMETIMES")
Original consumption: msgid=1:1:0:1First retry: msgid=2:1:-1Second retry: msgid=2:2:-1Third retry: msgid=2:3:-1.......16th retry: msgid=2:16:0:1Written to dead letter queue at the 17th retry: msgid=3:1:-1
Consumer<byte[]> consumer1 = client.newConsumer().topic("persistent://pulsar-****").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true)//Enable retry consumption.//.deadLetterPolicy(DeadLetterPolicy.builder()// .maxRedeliverCount(maxRedeliveryCount)// .retryLetterTopic("persistent://my-property/my-ns/my-subscription-retry")//Specify the retry queue.// .deadLetterTopic("persistent://my-property/my-ns/my-subscription-dlq")//Specify the dead letter queue.// .build()).subscribe();
while (true) {Message msg = consumer.receive();try {// Do something with the messageSystem.out.printf("Message received: %s", new String(msg.getData()));// Acknowledge the message so that it can be deleted by the message brokerconsumer.acknowledge(msg);} catch (Exception e) {// select reconsume policyconsumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);//consumer.reconsumeLater(msg, 1);//consumer.reconsumeLater(msg);}}
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://pulsar-****").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared)// 1 minute by default..negativeAckRedeliveryDelay(1, TimeUnit.MINUTES).subscribe();while (true) {Message msg = consumer.receive();try {// Do something with the messageSystem.out.printf("Message received: %s", new String(msg.getData()));// Acknowledge the message so that it can be deleted by the message brokerconsumer.acknowledge(msg);} catch (Exception e) {// Message failed to process, redeliver laterconsumer.negativeAcknowledge(msg);}}
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://pulsar-****").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared)// 1 minute by default..negativeAckRedeliveryDelay(1, TimeUnit.MINUTES).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(5)//Specify the maximum number of retries..deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//Specify the dead letter queue..build()).subscribe();while (true) {Message msg = consumer.receive();try {// Do something with the messageSystem.out.printf("Message received: %s", new String(msg.getData()));// Acknowledge the message so that it can be deleted by the message brokerconsumer.acknowledge(msg);} catch (Exception e) {// Message failed to process, redeliver laterconsumer.negativeAcknowledge(msg);}}
Feedback