RabbitMQ-Java-05-死信队列
说明
- RabbitMQ-Java-05-死信队列
- 本案例是一个Maven项目
- 假设你已经实现了上一节发布订阅模式
- 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
核心概念
》死信来源
- 消息TTL过期
- 队列满了
- 消息被拒绝
》死信实现过程概览
定义消费者01
- 声明两个交换机:正常交换机、死信交换机
声明两个队列:正常队列、死信队列
- 正常队列声明的时候传入第五个参数arguments:args,参数定义在arg中
- 声明两个路由key:正常路由key、死信路由key
- 分别绑定两组交换机、队列、路由key
核心代码抽取
// 正常队列附加参数 Map<String, Object> args = new HashMap<>(); // 正常队列设置TTL(不常用),一般都是消息发送方设置消息的TTL // args.put("x-message-ttl", 10000); //单位毫秒 // 正常队列关联设置死信交换机 args.put("x-dead-letter-exchange", EXCHANGE_NAME_DEAD); // 正常队列关联设置死信路由key args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD); // 声明队列 channel.queueDeclare(QUEUE_NAME_NORMAL, true, false, false, args); channel.queueDeclare(QUEUE_NAME_DEAD, true, false, false, null);
定义生产者
- 发送消息前,第三个参数需要传入props:props,将消息过期时间参数定义在props中
核心代码抽取
// 设置消息过期时间,单位毫秒 AMQP.BasicProperties props = new AMQP.BasicProperties().builder().expiration("10000").build(); // 发送消息 channel.basicPublish(EXCHANGE_NAME_NORMAL, ROUTING_KEY_NORMAL, props, msg.getBytes("UTF-8"));
- 注意:props定义的时候要严格按照上边的链式写法,如果你先new AMQP.BasicProperties(),然后在通过props.builder()....会导致过期时间参数不生效,builder()等方法返回的是新对象而不是改变props本身
操作步骤
》完整代码
RabbitMqUtils
- 参考我的本系列其他文章
Consumer01
package cn.cnyasin.rabbit.dead; import cn.cnyasin.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; public class Consumer01 { // 交换机名 public static final String EXCHANGE_NAME_NORMAL = "exchange.normal"; public static final String EXCHANGE_NAME_DEAD = "exchange.dead"; // 队列名 public static final String QUEUE_NAME_NORMAL = "queue.normal"; public static final String QUEUE_NAME_DEAD = "queue.dead"; // 路由key名 public static final String ROUTING_KEY_NORMAL = "routing.normal"; public static final String ROUTING_KEY_DEAD = "routing.dead"; // 信道 public static Channel channel = null; public static void main(String[] args) throws Exception { // 初始化 init(); Channel channel = getChannel(); // 接收消息成功回调 DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println("[*] 成功收到消息:" + new String(message.getBody())); }; // 接收消息失败回调 CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("[*] 接收消息失败"); }; System.out.println("[*] 正在等待接收消息。。。"); // 接收消息 channel.basicConsume(QUEUE_NAME_NORMAL, true, deliverCallback, cancelCallback); } /** * 初始化 * * @throws Exception */ public static void init() throws Exception { Channel channel = getChannel(); // 声明交换机(直接交换机) channel.exchangeDeclare(EXCHANGE_NAME_NORMAL, BuiltinExchangeType.DIRECT, true); channel.exchangeDeclare(EXCHANGE_NAME_DEAD, BuiltinExchangeType.DIRECT, true); // 正常队列附加参数 Map<String, Object> args = new HashMap<>(); // 正常队列设置TTL(不常用),一般都是消息发送方设置消息的TTL // args.put("x-message-ttl", 10000); //单位毫秒 // 正常队列关联设置死信交换机 args.put("x-dead-letter-exchange", EXCHANGE_NAME_DEAD); // 正常队列关联设置死信路由key args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD); // 声明队列 channel.queueDeclare(QUEUE_NAME_NORMAL, true, false, false, args); channel.queueDeclare(QUEUE_NAME_DEAD, true, false, false, null); // 绑定队列、交换机、路由key channel.queueBind(QUEUE_NAME_NORMAL, EXCHANGE_NAME_NORMAL, ROUTING_KEY_NORMAL); channel.queueBind(QUEUE_NAME_DEAD, EXCHANGE_NAME_DEAD, ROUTING_KEY_DEAD); } public static Channel getChannel() throws Exception { if (channel == null) { setChannel(RabbitMqUtils.getChannel()); } return channel; } public static void setChannel(Channel channel) { Consumer01.channel = channel; } }
Consumer02
- 死信消费者,正常消费者就好,本案例没写
Producer
package cn.cnyasin.rabbit.dead; import cn.cnyasin.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; public class Producer { // 信道 public static Channel channel = null; // 交换机名 public static final String EXCHANGE_NAME_NORMAL = "exchange.normal"; // 路由key名 public static final String ROUTING_KEY_NORMAL = "routing.normal"; public static void main(String[] args) throws Exception { Channel channel = getChannel(); for (int i = 0; i < 5; i++) { String msg = "你好啊:" + i; // 设置消息过期时间,单位毫秒 AMQP.BasicProperties props = new AMQP.BasicProperties().builder().expiration("10000").build(); // 发送消息 channel.basicPublish(EXCHANGE_NAME_NORMAL, ROUTING_KEY_NORMAL, props, msg.getBytes("UTF-8")); } System.exit(200); } public static Channel getChannel() throws Exception { if (channel == null) { setChannel(RabbitMqUtils.getChannel()); } return channel; } public static void setChannel(Channel channel) { Producer.channel = channel; } }
》运行
- 先运行Consumer01,然后stop
- 在运行Producer
- 登陆RabbitMQ后台查看,正常队列先有消息,10秒后消息全部进入死信队列,后续你只需要实现死信消费者即可。
备注
- 该教程部分内容收集自网络,感谢原作者。
附录
- 无