说明

核心概念

》原理

  • 设置要求队列必须持久化
  • 设置要求消息必须持久化
  • 开启发布确认:channel.confirmSelect()

》分类

  • 单个发布确认

    • 同步确认发布的方式,发布一条原地等待确认,啥时候等到确认啥时候发布下一条
    • 安全,效率最差
    • 核心代码抽取

      // 开启发布确认默认
      channel.confirmSelect();
      
      // 等待确认
      boolean confirms = channel.waitForConfirms();
  • 批量发布确认

    • 把单个发布确认的动作改为定义一个批量值,每次发布消息数量达到这个值后触发一次确认
    • 不安全,出问题后无法定位哪个消息出了问题,效率中等
    • 核心代码抽取

      // 定义多少条消息确认一次
      int count = 100;
      
      // ---------- code...
      
      if ((i + 1) % count == 0) {
          // 等待确认
          boolean confirms = channel.waitForConfirms();
      }
  • 异步发布确认

    • 在发布消息前定义一个监听器,这个监听器可以监听发布成功回调、发布失败回调
    • 发消息的时候尽管发不用管确认的事情,确认逻辑由监听器处理
    • 安全,效率最好
    • 核心代码抽取

      // 应答成功确认回调
      ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> {
          System.out.println("[√] 消息应答成功:" + deliveryTag);
      };
      
      // 应答失败确认回调
      ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> {
          System.out.println("[×] 消息应答失败:" + deliveryTag);
      };
      
      // 定义发布确认监听
      channel.addConfirmListener(ackCallback, nackCallback);
  • 异步发布确认处理应答失败消息

    • 核心代码抽取

      /*
      * 为了处理应答失败消息,定义一个线程安全有序的哈希表,适用于高并发
      * 将序号与消息关联
      * 根据序号批量删除条目
      * 支持高并发(多线程)
      */
      ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
      
      // 应答成功确认回调
      ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> {
          if (multiple) {
              ConcurrentNavigableMap<Long, String> headMap = concurrentSkipListMap.headMap(deliveryTag);
              headMap.clear();
          } else {
              concurrentSkipListMap.remove(deliveryTag);
          }
      
          System.out.println("[√] 消息应答成功:" + deliveryTag);
      };
      
      // 应答失败确认回调
      ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> {
          System.out.println("[×] 消息应答失败:" + deliveryTag);
          System.out.println(concurrentSkipListMap);
      };
      
      // --------------- code...
      
      // 发送消息
      channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8"));
      
      // 记录消息发送记录到哈希表
      concurrentSkipListMap.put(channel.getNextPublishSeqNo(), msg);

操作步骤

》完整代码

  • 工具类:RabbitMqUtils

    package cn.cnyasin.rabbit.utils;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitMqUtils {
        public static Channel getChannel() throws Exception {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            // 配置
            factory.setHost("192.168.3.202");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("123456");
            factory.setVirtualHost("/");
    
            // 获取连接
            Connection connection = factory.newConnection();
    
            // 获取信道
            Channel channel = connection.createChannel();
    
            return channel;
        }
    }
    
  • 生产者:Producer

    package cn.cnyasin.rabbit.confirm;
    
    import cn.cnyasin.rabbit.utils.RabbitMqUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmCallback;
    
    import java.util.concurrent.ConcurrentNavigableMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    
    public class Producer {
        // 交换机名
        public static final String EXCHANGE_NAME = "exchange_confirm";
    
        // 队列名
        public static final String QUEUE_NAME = "queue_confirm";
    
        // 路由key
        public static final String ROUTING_KEY = "routing_confirm";
    
        // 信道
        public static Channel channel = null;
    
        public static void main(String[] args) throws Exception {
            // 单个发布确认
            // publishOneConfirm(); // 执行结果:[*] 单个发布确认用时1137毫秒
    
            // 批量发布确认
            // publishBatchConfirm(); // 执行结果:[*] 批量发布确认用时227毫秒
    
            // 异步发布确认
            // publishAsyncConfirm(); // 执行结果:[*] 异步发布确认用时175毫秒
    
            // 异步发布确认(处理应答失败消息)
            // publishAsyncConfirm2(); // 执行结果:[*] 异步发布确认用时179毫秒
    
        }
    
        /**
         * 单个发布确认
         *
         * @throws Exception
         */
        public static void publishOneConfirm() throws Exception {
            // 初始化
            init();
    
            // 获取信道
            Channel channel = getChannel();
    
            // 开启发布确认默认
            channel.confirmSelect();
    
            // 开始时间
            long start = System.currentTimeMillis();
    
            // 循环发布消息1000条
            for (int i = 0; i < 1000; i++) {
                String msg = "消息" + i;
                // 发送消息
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8"));
    
                // 等待确认
                boolean confirms = channel.waitForConfirms();
                if (confirms) {
                    System.out.println("[*] 消息[" + i + "]发送成功");
                }
            }
    
            // 结束时间
            long end = System.currentTimeMillis();
    
            System.out.println("[*] 单个发布确认用时" + (end - start) + "毫秒");
        }
    
        /**
         * 批量发布确认
         *
         * @throws Exception
         */
        public static void publishBatchConfirm() throws Exception {
            // 初始化
            init();
    
            // 获取信道
            Channel channel = getChannel();
    
            // 开启发布确认默认
            channel.confirmSelect();
    
            // 开始时间
            long start = System.currentTimeMillis();
    
            // 定义多少条消息确认一次
            int count = 100;
    
            // 循环发布消息1000条
            for (int i = 0; i < 1000; i++) {
                String msg = "消息" + i;
                // 发送消息
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8"));
    
                if ((i + 1) % count == 0) {
                    // 等待确认
                    boolean confirms = channel.waitForConfirms();
                    if (confirms) {
                        System.out.println("[*] 消息[" + i + "]发送成功");
                    }
                }
            }
    
            // 结束时间
            long end = System.currentTimeMillis();
    
            System.out.println("[*] 批量发布确认用时" + (end - start) + "毫秒");
        }
    
        /**
         * 异步发布确认
         *
         * @throws Exception
         */
        public static void publishAsyncConfirm() throws Exception {
            // 初始化
            init();
    
            // 获取信道
            Channel channel = getChannel();
    
            // 开启发布确认默认
            channel.confirmSelect();
    
            // 应答成功确认回调
            ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> {
                System.out.println("[√] 消息应答成功:" + deliveryTag);
            };
    
            // 应答失败确认回调
            ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> {
                System.out.println("[×] 消息应答失败:" + deliveryTag);
            };
    
            // 定义发布确认监听
            channel.addConfirmListener(ackCallback, nackCallback);
    
            // 开始时间
            long start = System.currentTimeMillis();
    
            // 循环发布消息1000条
            for (int i = 0; i < 1000; i++) {
                String msg = "消息" + i;
                // 发送消息
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8"));
    
                // System.out.println("[*] 消息[" + i + "]发送成功");
            }
    
            // 结束时间
            long end = System.currentTimeMillis();
    
            System.out.println("[*] 异步发布确认用时" + (end - start) + "毫秒");
        }
    
        /**
         * 异步发布确认(处理应答失败消息)
         *
         * @throws Exception
         */
        public static void publishAsyncConfirm2() throws Exception {
            // 初始化
            init();
    
            // 获取信道
            Channel channel = getChannel();
    
            // 开启发布确认默认
            channel.confirmSelect();
    
            /*
             * 为了处理应答失败消息,定义一个线程安全有序的哈希表,适用于高并发
             * 将序号与消息关联
             * 根据序号批量删除条目
             * 支持高并发(多线程)
             */
            ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
    
            // 应答成功确认回调
            ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> headMap = concurrentSkipListMap.headMap(deliveryTag);
                    headMap.clear();
                } else {
                    concurrentSkipListMap.remove(deliveryTag);
                }
    
                System.out.println("[√] 消息应答成功:" + deliveryTag);
            };
    
            // 应答失败确认回调
            ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> {
                System.out.println("[×] 消息应答失败:" + deliveryTag);
                System.out.println(concurrentSkipListMap);
            };
    
            // 定义发布确认监听
            channel.addConfirmListener(ackCallback, nackCallback);
    
            // 开始时间
            long start = System.currentTimeMillis();
    
            // 循环发布消息1000条
            for (int i = 0; i < 1000; i++) {
                String msg = "消息" + i;
                // 发送消息
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("UTF-8"));
    
                // 记录消息发送记录到哈希表
                concurrentSkipListMap.put(channel.getNextPublishSeqNo(), msg);
            }
    
            // 结束时间
            long end = System.currentTimeMillis();
    
            System.out.println("[*] 异步发布确认用时" + (end - start) + "毫秒");
        }
    
        /**
         * 初始化交换机、队列、绑定路由key
         *
         * @throws Exception
         */
        public static void init() throws Exception {
            // 获取信道
            Channel channel = getChannel();
    
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            // 绑定队列、交换机、路由key
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        }
    
        public static Channel getChannel() throws Exception {
            if (channel == null) {
                setChannel(RabbitMqUtils.getChannel());
            }
    
            return channel;
        }
    
        public static void setChannel(Channel channel) {
            Producer.channel = channel;
        }
    }
    

备注

  • 该教程部分内容收集自网络,感谢原作者。

附录

标签: Java, RabbitMq

添加新评论


手机号仅后台超管可见,普通注册用户以及网站前台全站不可见,请勿担心泄露风险!