说明

核心概念

》发布确认高级说明

  • 发布确认高级是为了解决什么问题呢?

    • 比如RabbitMq服务器宕机或重启导致数据丢失问题
  • 主要操作有哪些呢?

    • 准备一个Maven+SpringBoot项目,添加必要的Maven依赖以及RabbitMQ配置项,具体可以参考上一节延迟队列
    • 在上一节添加了RabbitMQ配置后还需要额外增加两项配置

      • 设置发布确认回调类型:发送消息到交换机会回调(成功失败都回调)

        spring.rabbitmq.publisher-confirm-type=correlated
        • none:禁用(默认值)
        • correlated:发送消息到交换机会回调(成功失败都回调)
        • simple:在作用域中使用,相当于同步确认机制,有关闭channel风险
      • 开启发布返回

        spring.rabbitmq.publisher-returns=true
    • 新建发布确认高级-配置类
    • 新建发布确认高级-发布确认回调组件

      • @Component注解注册组件,implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback

        • @Autowired注解自动装配RabbitTemplate
        • @PostConstruct注解把当前类自动注入到全局RabbitTemplate中

          • 注意:@PostConstruct必须在@Autowired之后
        • 重写confirm方法:交换机确认回调(成功失败都回调)
        • 重写returnedMessage方法:队列确认回调(仅送达失败时回调)
      • 新建发布确认高级-消费者01
      • 发布确认高级-控制器(生产者)

操作步骤

》完整代码

  • application.properties

    spring.rabbitmq.host=192.168.3.202
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.publisher-confirm-type=correlated
    spring.rabbitmq.publisher-returns=true
  • MyConfirmConfig

    package cn.cnyasin.rabbit.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 发布确认高级-配置类
     */
    @Configuration
    public class MyConfirmConfig {
        // 交换机
        public static final String EXCHANGE_CONFIRM = "exchange_confirm";
        // 队列
        public static final String QUEUE_CONFIRM = "queue_confirm";
        // 路由key
        public static final String ROUTING_CONFIRM = "routing_confirm";
    
        // 声明交换机
        @Bean
        public DirectExchange exchangeConfirm() {
            return ExchangeBuilder.directExchange(EXCHANGE_CONFIRM).build();
        }
    
        // 声明队列
        @Bean
        public Queue queueConfirm() {
            return QueueBuilder.durable(QUEUE_CONFIRM).build();
        }
    
        // 绑定队列交换机路由key
        @Bean
        public Binding queueConfirmBindingExchangeConfirm(
                @Qualifier("queueConfirm") Queue queueConfirm,
                @Qualifier("exchangeConfirm") DirectExchange exchangeConfirm
        ) {
            return BindingBuilder.bind(queueConfirm).to(exchangeConfirm).with(ROUTING_CONFIRM);
        }
    
    }
    
  • MyConfirmCallback

    package cn.cnyasin.rabbit.component;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.Date;
    
    /**
     * 发布确认高级-发布确认回调组件
     * ——@Component注解注册组件,implements RabbitTemplate.ConfirmCallback
     * ——@Autowired注解自动装配RabbitTemplate
     * ——@PostConstruct注解把当前类自动注入到全局RabbitTemplate中
     * ————注意:@PostConstruct必须在@Autowired之后
     * ——重写confirm方法:交换机确认回调(成功失败都回调)
     * ——重写returnedMessage方法:队列确认回调(仅送达失败时回调)
     */
    @Slf4j
    @Component
    public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnsCallback(this);
        }
    
        /**
         * 交换机确认回调(成功失败都回调)
         *
         * @param correlationData
         * @param ack
         * @param cause
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                log.info("[*] [{}] 确认回调成功,回调ID:{}", new Date().toString(), correlationData.getId());
            } else {
                log.info("[*] [{}] 确认回调失败,回调ID:{},失败原因:{}", new Date().toString(), correlationData.getId(), cause);
            }
        }
    
        /**
         * 队列确认回调(仅送达失败时回调)
         *
         * @param returnedMessage
         */
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            int code = returnedMessage.getReplyCode();
            String text = returnedMessage.getReplyText();
            byte[] message = returnedMessage.getMessage().getBody();
            String exchange = returnedMessage.getExchange();
            String routingKey = returnedMessage.getRoutingKey();
            log.info("[*] [{}] 消息未送达队列回调,错误码code:{}。原因:{}。消息:{}。交换机:{}。路由key:{}",
                    new Date().toString(), code, text, new String(message), exchange, routingKey);
        }
    }
    
  • MyConfirmConsumer01

    package cn.cnyasin.rabbit.consumer;
    
    import cn.cnyasin.rabbit.config.MyConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * 发布确认高级-消费者01
     */
    @Slf4j
    @Component
    public class MyConfirmConsumer01 {
        @RabbitListener(queues = MyConfirmConfig.QUEUE_CONFIRM)
        public void receiveMessage(String message) {
            log.info("[*] [{}] 发布确认高级-消费者01 接收到消息:{}", new Date().toString(), message);
        }
    
    }
    
  • MyConfirmController

    package cn.cnyasin.rabbit.controller;
    
    import cn.cnyasin.rabbit.config.MyConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    /**
     * 发布确认高级-控制器(生产者)
     */
    @Slf4j
    @RestController
    @RequestMapping("/confirm")
    public class MyConfirmController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/send/{msg}")
        public String send(@PathVariable String msg) throws Exception {
            log.info("[*] [{}] 准备发送消息:{}", new Date().toString(), msg);
    
            CorrelationData correlationData = new CorrelationData("1");
            rabbitTemplate.convertAndSend(MyConfirmConfig.EXCHANGE_CONFIRM, MyConfirmConfig.ROUTING_CONFIRM, msg + 1, correlationData);
    
            CorrelationData correlationData2 = new CorrelationData("2");
            rabbitTemplate.convertAndSend(MyConfirmConfig.EXCHANGE_CONFIRM + 2, MyConfirmConfig.ROUTING_CONFIRM, msg + 2, correlationData2);
    
            CorrelationData correlationData3 = new CorrelationData("3");
            rabbitTemplate.convertAndSend(MyConfirmConfig.EXCHANGE_CONFIRM, MyConfirmConfig.ROUTING_CONFIRM + 3, msg + 3, correlationData3);
    
            return "OK";
        }
    }
    

备注

  • 该教程部分内容收集自网络,感谢原作者。

附录

标签: Java, RabbitMq

添加新评论


手机号仅后台超管可见,普通注册用户以及网站前台全站不可见,请勿担心泄露风险!