RabbitMQ-Java-03-发布确认
说明
- RabbitMQ-Java-03-发布确认
- 本案例是一个Maven项目
- 假设你已经实现了上一节工作队列
- 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
核心概念
》原理
- 设置要求队列必须持久化
- 设置要求消息必须持久化
- 开启发布确认:channel.confirmSelect()
》分类
单个发布确认
- 同步确认发布的方式,发布一条原地等待确认,啥时候等到确认啥时候发布下一条
- 安全,效率最差
核心代码抽取
// 开启发布确认默认 channel.confirmSelect(); // 等待确认 boolean confirms = channel.waitForConfirms();
批量发布确认
- 把单个发布确认的动作改为定义一个批量值,每次发布消息数量达到这个值后触发一次确认
- 不安全,出问题后无法定位哪个消息出了问题,效率中等
核心代码抽取
// 定义多少条消息确认一次 int count = 100; // ---------- code... if ((i + 1) % count == 0) { // 等待确认 boolean confirms = channel.waitForConfirms(); }
异步发布确认
- 在发布消息前定义一个监听器,这个监听器可以监听发布成功回调、发布失败回调
- 发消息的时候尽管发不用管确认的事情,确认逻辑由监听器处理
- 安全,效率最好
核心代码抽取
// 应答成功确认回调 ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> { System.out.println("[√] 消息应答成功:" + deliveryTag); }; // 应答失败确认回调 ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> { System.out.println("[×] 消息应答失败:" + deliveryTag); }; // 定义发布确认监听 channel.addConfirmListener(ackCallback, nackCallback);
异步发布确认处理应答失败消息
核心代码抽取
/* * 为了处理应答失败消息,定义一个线程安全有序的哈希表,适用于高并发 * 将序号与消息关联 * 根据序号批量删除条目 * 支持高并发(多线程) */ ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>(); // 应答成功确认回调 ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> headMap = concurrentSkipListMap.headMap(deliveryTag); headMap.clear(); } else { concurrentSkipListMap.remove(deliveryTag); } System.out.println("[√] 消息应答成功:" + deliveryTag); }; // 应答失败确认回调 ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> { System.out.println("[×] 消息应答失败:" + deliveryTag); System.out.println(concurrentSkipListMap); }; // --------------- code... // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8")); // 记录消息发送记录到哈希表 concurrentSkipListMap.put(channel.getNextPublishSeqNo(), msg);
操作步骤
》完整代码
工具类:RabbitMqUtils
package cn.cnyasin.rabbit.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMqUtils { public static Channel getChannel() throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 配置 factory.setHost("192.168.3.202"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 获取连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); return channel; } }
生产者:Producer
package cn.cnyasin.rabbit.confirm; import cn.cnyasin.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; public class Producer { // 交换机名 public static final String EXCHANGE_NAME = "exchange_confirm"; // 队列名 public static final String QUEUE_NAME = "queue_confirm"; // 路由key public static final String ROUTING_KEY = "routing_confirm"; // 信道 public static Channel channel = null; public static void main(String[] args) throws Exception { // 单个发布确认 // publishOneConfirm(); // 执行结果:[*] 单个发布确认用时1137毫秒 // 批量发布确认 // publishBatchConfirm(); // 执行结果:[*] 批量发布确认用时227毫秒 // 异步发布确认 // publishAsyncConfirm(); // 执行结果:[*] 异步发布确认用时175毫秒 // 异步发布确认(处理应答失败消息) // publishAsyncConfirm2(); // 执行结果:[*] 异步发布确认用时179毫秒 } /** * 单个发布确认 * * @throws Exception */ public static void publishOneConfirm() throws Exception { // 初始化 init(); // 获取信道 Channel channel = getChannel(); // 开启发布确认默认 channel.confirmSelect(); // 开始时间 long start = System.currentTimeMillis(); // 循环发布消息1000条 for (int i = 0; i < 1000; i++) { String msg = "消息" + i; // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8")); // 等待确认 boolean confirms = channel.waitForConfirms(); if (confirms) { System.out.println("[*] 消息[" + i + "]发送成功"); } } // 结束时间 long end = System.currentTimeMillis(); System.out.println("[*] 单个发布确认用时" + (end - start) + "毫秒"); } /** * 批量发布确认 * * @throws Exception */ public static void publishBatchConfirm() throws Exception { // 初始化 init(); // 获取信道 Channel channel = getChannel(); // 开启发布确认默认 channel.confirmSelect(); // 开始时间 long start = System.currentTimeMillis(); // 定义多少条消息确认一次 int count = 100; // 循环发布消息1000条 for (int i = 0; i < 1000; i++) { String msg = "消息" + i; // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8")); if ((i + 1) % count == 0) { // 等待确认 boolean confirms = channel.waitForConfirms(); if (confirms) { System.out.println("[*] 消息[" + i + "]发送成功"); } } } // 结束时间 long end = System.currentTimeMillis(); System.out.println("[*] 批量发布确认用时" + (end - start) + "毫秒"); } /** * 异步发布确认 * * @throws Exception */ public static void publishAsyncConfirm() throws Exception { // 初始化 init(); // 获取信道 Channel channel = getChannel(); // 开启发布确认默认 channel.confirmSelect(); // 应答成功确认回调 ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> { System.out.println("[√] 消息应答成功:" + deliveryTag); }; // 应答失败确认回调 ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> { System.out.println("[×] 消息应答失败:" + deliveryTag); }; // 定义发布确认监听 channel.addConfirmListener(ackCallback, nackCallback); // 开始时间 long start = System.currentTimeMillis(); // 循环发布消息1000条 for (int i = 0; i < 1000; i++) { String msg = "消息" + i; // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8")); // System.out.println("[*] 消息[" + i + "]发送成功"); } // 结束时间 long end = System.currentTimeMillis(); System.out.println("[*] 异步发布确认用时" + (end - start) + "毫秒"); } /** * 异步发布确认(处理应答失败消息) * * @throws Exception */ public static void publishAsyncConfirm2() throws Exception { // 初始化 init(); // 获取信道 Channel channel = getChannel(); // 开启发布确认默认 channel.confirmSelect(); /* * 为了处理应答失败消息,定义一个线程安全有序的哈希表,适用于高并发 * 将序号与消息关联 * 根据序号批量删除条目 * 支持高并发(多线程) */ ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>(); // 应答成功确认回调 ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> headMap = concurrentSkipListMap.headMap(deliveryTag); headMap.clear(); } else { concurrentSkipListMap.remove(deliveryTag); } System.out.println("[√] 消息应答成功:" + deliveryTag); }; // 应答失败确认回调 ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> { System.out.println("[×] 消息应答失败:" + deliveryTag); System.out.println(concurrentSkipListMap); }; // 定义发布确认监听 channel.addConfirmListener(ackCallback, nackCallback); // 开始时间 long start = System.currentTimeMillis(); // 循环发布消息1000条 for (int i = 0; i < 1000; i++) { String msg = "消息" + i; // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8")); // 记录消息发送记录到哈希表 concurrentSkipListMap.put(channel.getNextPublishSeqNo(), msg); } // 结束时间 long end = System.currentTimeMillis(); System.out.println("[*] 异步发布确认用时" + (end - start) + "毫秒"); } /** * 初始化交换机、队列、绑定路由key * * @throws Exception */ public static void init() throws Exception { // 获取信道 Channel channel = getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定队列、交换机、路由key channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); } public static Channel getChannel() throws Exception { if (channel == null) { setChannel(RabbitMqUtils.getChannel()); } return channel; } public static void setChannel(Channel channel) { Producer.channel = channel; } }
备注
- 该教程部分内容收集自网络,感谢原作者。
附录
- 无