新功能发布记录
公告
// 声明死信交换机channel.exchangeDeclare("${dlxExchangeName}", "direct", true);// 声明用于发送延时消息的交换机channel.exchangeDeclare("${delayExchangeName}", "direct", true);// 声明用于发送延时消息的队列,并指定其死信交换机Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "${dlxExchangeName}");channel.queueDeclare("${delayQueueName}", true, false, false, args);channel.queueBind("${delayQueueName}", "${delayExchangeName}", "");// 声明死信队列channel.queueDeclare("${delayQueueName}", true, false, false, null);channel.queueBind("${dlxQueueName}", "${dlxExchangeName}", "");// 发送延时消息int delayInSeconds = 10; // 消息延时 10sAMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayInSeconds * 1000));channel.basicPublish("${delayExchangeName}", "", props.build(), "delayed payload".getBytes());
参数 | 说明 |
${dlxExchangeName} | 用于延迟消息的死信交换机名称,请替换为可在控制台 Exchange 列表查询到的名称。 |
${delayExchangeName} | 实际发送延时消息的交换机名称,请替换为可在控制台 Exchange 列表查询到的名称。 |
x-dead-letter-exchange | 队列参数 key,用于设置其对应的死信交换机。 |
${dlxQueueName} | 用于延迟消息的死信队列名称 |
${delayQueueName} | 实际发送延时消息的队列名称 |
mandatory ,生产者无法通过 basic.return 事件感知到无法路由的消息,因此发送延时消息前请务必保证对应的交换机、队列、路由关系存在。// 声明延时交换机Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");channel.exchangeDeclare("${delayedExchangeName}", "x-delayed-message", true, false, args);
参数 | 说明 |
x-delayed-type | Exchange 的类型,指定路由规则。取值说明如下: direct fanout topic |
${delayedExchangeName} | Exchange 的名称,请替换为可在控制台 Exchange 列表查询到的名称。 |
x-delayed-message | 指定 Exchange 类型,以支持投递延时消息。 |
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");Map<String, Object> headers = new HashMap<String, Object>();headers.put("x-delay", 4000); // 消息延时 4sAMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);channel.basicPublish("${delayedExchangeName}", "", props.build(), messageBodyBytes);
byte[] messageBodyBytes = "delayed message".getBytes("UTF-8"); //定义消息主体Map<String, Object> headers = new HashMap<String, Object>(); //创建Map集合,存储消息的自定义Header信息headers.put("x-delay", 1000); //向Header添加延时消息键值对,指定消息的延时时间为1000毫秒。AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); //将Header信息设置到消息属性中channel.basicPublish("ExchangeName", "Routing Key", props.build(), messageBodyBytes); //把消息发送到指定的Exchange
文档反馈