Constraints and Limitations
In Implementation Method 2 below, the delayed message capability of the Managed Edition cluster relies on the rabbitmq_delayed_message_exchange plugin. First, enable the plugin on the plugin management page. For details, see Managing Plugins. The time value for delayed messages must be a non-negative integer in milliseconds.
If you have set a Time To Live (TTL) for your delayed message, the TTL is calculated as follows: Actual TTL of the delayed message = min{message TTL, queue TTL} + Delay time.
Usage Methods
Method 1: Achieving Delayed Messages Through Setting Message Expiration Time and Dead Letter Queues
How It Works
The sender sets the message expiration time, triggering the expired and unconsumed messages to be delivered to the dead letter queue under the dead letter exchange.
The consumer consumes delayed messages by consuming messages from the dead letter queue.
Usage Examples
The code below is for example purposes only. Parameters such as the exchange type and Routing Key should be replaced with values that meet your actual business requirements.
A sample code for the sender is as follows. The consumer can subscribe to the dead letter queue.
channel.exchangeDeclare("${dlxExchangeName}", "direct", true);
channel.exchangeDeclare("${delayExchangeName}", "direct", true);
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "${dlxExchangeName}");
channel.queueDeclare("${delayQueueName}", true, false, false, args);
channel.queueBind("${delayQueueName}", "${delayExchangeName}", "");
channel.queueDeclare("${delayQueueName}", true, false, false, null);
channel.queueBind("${dlxQueueName}", "${dlxExchangeName}", "");
int delayInSeconds = 10;
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayInSeconds * 1000));
channel.basicPublish("${delayExchangeName}", "", props.build(), "delayed payload".getBytes());
A description of parameters is as follows:
|
${dlxExchangeName} | Name of the dead letter exchange for delayed messages. Replace it with the name found in the exchange list in the console. |
${delayExchangeName} | Name of the exchange that sends delayed messages. Replace it with the name found in the exchange list in the console. |
x-dead-letter-exchange | Queue parameter key, which is used to set the dead letter exchange corresponding to the queue. |
${dlxQueueName} | Name of the dead letter queue for delayed messages. |
${delayQueueName} | Name of the queue that sends delayed messages. |
Method 2: Achieving Delayed Messages Through the Built-in rabbitmq_delayed_message_exchange Plugin
Use Limits
The design of the current plugin is not suitable for scenarios with a large number of delayed messages (hundreds of thousands or even millions of unscheduled messages). Therefore, you should carefully evaluate the message throughput in a production environment to avoid unexpected long delays, message loss, and other issues.
Delayed messages have only one persistent replica on each node. If a node fails to operate properly (for example, due to a message backlog that triggers continuous out-of-memory (OOM) errors, causing restarts and preventing recovery), the delayed messages on that node cannot be consumed by consumers.
Delayed exchanges do not support setting mandatory, meaning that the producer cannot detect unroutable messages through the basic.return event. Therefore, ensure that the corresponding exchanges, queues, and routing relationships exist before sending delayed messages.
In summary, we strongly recommend that you avoid using this plugin and instead use dead letter queues to indirectly implement delayed messages. If you use this plugin despite its drawbacks, we strongly recommend that you keep the number of delayed messages as low as possible to avoid triggering high memory load issues.
Usage Examples
The usage method of delayed messages in TDMQ for RabbitMQ is exactly the same as that of the officially supported delayed message plugin of RabbitMQ, which facilitates seamless migration for your business without modifications. For examples of sending and receiving messages, see Using the SDK to Send and Receive Messages. For modifications of delayed messages, see the following example: 1. Declare an exchange and specify its routing type.
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("${delayedExchangeName}", "x-delayed-message", true, false, args);
A description of parameters is as follows:
|
x-delayed-type | Exchange type, which specifies the routing rule. The value description is as follows: direct fanout topic |
${delayedExchangeName} | Exchange name: Replace it with the name that can be queried from the exchange list in the console. |
x-delayed-message | Specifies the exchange type to support delivering delayed messages. |
2. Send a delayed message. Add a key-value pair (key: x-delay; value: number of milliseconds) in the header properties of the message and specify the target exchange to which the message will be sent as the exchange declared in the previous step.
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 4000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("${delayedExchangeName}", "", props.build(), messageBodyBytes);
When a message arrives at the exchange, it will be delivered to the corresponding queue after 4,000 milliseconds.
Constraints and Limitations
Serverless Edition clusters support delayed messages without enabling the open-source delayed message plugin.
The time value for delayed messages must be a non-negative integer in milliseconds.
If you have set a Time To Live (TTL) for your delayed message, the TTL is calculated as follows: Actual TTL of the delayed message = min{message TTL, queue TTL} + Delay time.
Serverless Edition clusters have a maximum delay time limitation for delayed messages. If the delay time of your delayed message exceeds this limit, it will be treated as a normal message and delivered to consumers immediately after production. For details about the maximum delay time limitation for delayed messages in Serverless Edition clusters, see Use Limits. Usage Methods
Step
When producing a message, add an X-delay key in the message header property, with its value set to the message delay time in milliseconds.
Examples
byte[] messageBodyBytes = "delayed message".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 1000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("ExchangeName", "Routing Key", props.build(), messageBodyBytes);