// Declare a dead letter exchange.channel.exchangeDeclare("${dlxExchangeName}", "direct", true);// Declare an exchange for sending delayed messages.channel.exchangeDeclare("${delayExchangeName}", "direct", true);// Declare a queue for sending delayed messages and specify its dead letter exchange.Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "${dlxExchangeName}");channel.queueDeclare("${delayQueueName}", true, false, false, args);channel.queueBind("${delayQueueName}", "${delayExchangeName}", "");// Declare a dead letter queue.channel.queueDeclare("${delayQueueName}", true, false, false, null);channel.queueBind("${dlxQueueName}", "${dlxExchangeName}", "");// Send a delayed message.int delayInSeconds = 10; // Message delay time is 10s.AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayInSeconds * 1000));channel.basicPublish("${delayExchangeName}", "", props.build(), "delayed payload".getBytes());
Parameter | Description |
${dlxExchangeName} | Name of the dead letter exchange for delayed messages. Replace it with the name found in the exchange list in the console. |
${delayExchangeName} | Name of the exchange that sends delayed messages. Replace it with the name found in the exchange list in the console. |
x-dead-letter-exchange | Queue parameter key, which is used to set the dead letter exchange corresponding to the queue. |
${dlxQueueName} | Name of the dead letter queue for delayed messages. |
${delayQueueName} | Name of the queue that sends delayed messages. |
mandatory, meaning that the producer cannot detect unroutable messages through the basic.return event. Therefore, ensure that the corresponding exchanges, queues, and routing relationships exist before sending delayed messages.// Declare a delayed exchange.Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");channel.exchangeDeclare("${delayedExchangeName}", "x-delayed-message", true, false, args);
Parameter | Description |
x-delayed-type | Exchange type, which specifies the routing rule. The value description is as follows: direct fanout topic |
${delayedExchangeName} | Exchange name: Replace it with the name that can be queried from the exchange list in the console. |
x-delayed-message | Specifies the exchange type to support delivering delayed messages. |
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");Map<String, Object> headers = new HashMap<String, Object>();headers.put("x-delay", 4000); // Message delay time is 4s.AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);channel.basicPublish("${delayedExchangeName}", "", props.build(), messageBodyBytes);
byte[] messageBodyBytes = "delayed message".getBytes("UTF-8"); // Define the message body.Map<String, Object> headers = new HashMap<String, Object>(); // Create a map collection to store custom header information for messages.headers.put("x-delay", 1000); //Add a delayed message key-value pair to the header and specify the message delay time as 1000 milliseconds.AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); //Set the header information to the message properties.channel.basicPublish("ExchangeName", "Routing Key", props.build(), messageBodyBytes); //Send the message to the specified exchange.
Feedback