RabbitMQ-Java-07-发布确认高级
说明
- RabbitMQ-Java-07-发布确认高级
- 本案例是一个Maven+SpringBoot项目
- 假设你已经实现了上一节延迟队列
- 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
核心概念
》发布确认高级说明
发布确认高级是为了解决什么问题呢?
- 比如RabbitMq服务器宕机或重启导致数据丢失问题
主要操作有哪些呢?
- 准备一个Maven+SpringBoot项目,添加必要的Maven依赖以及RabbitMQ配置项,具体可以参考上一节延迟队列
在上一节添加了RabbitMQ配置后还需要额外增加两项配置
设置发布确认回调类型:发送消息到交换机会回调(成功失败都回调)
spring.rabbitmq.publisher-confirm-type=correlated
- none:禁用(默认值)
- correlated:发送消息到交换机会回调(成功失败都回调)
- simple:在作用域中使用,相当于同步确认机制,有关闭channel风险
开启发布返回
spring.rabbitmq.publisher-returns=true
- 新建发布确认高级-配置类
新建发布确认高级-发布确认回调组件
@Component注解注册组件,implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback
- @Autowired注解自动装配RabbitTemplate
@PostConstruct注解把当前类自动注入到全局RabbitTemplate中
- 注意:@PostConstruct必须在@Autowired之后
- 重写confirm方法:交换机确认回调(成功失败都回调)
- 重写returnedMessage方法:队列确认回调(仅送达失败时回调)
- 新建发布确认高级-消费者01
- 发布确认高级-控制器(生产者)
操作步骤
》完整代码
application.properties
spring.rabbitmq.host=192.168.3.202 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true
MyConfirmConfig
package cn.cnyasin.rabbit.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 发布确认高级-配置类 */ @Configuration public class MyConfirmConfig { // 交换机 public static final String EXCHANGE_CONFIRM = "exchange_confirm"; // 队列 public static final String QUEUE_CONFIRM = "queue_confirm"; // 路由key public static final String ROUTING_CONFIRM = "routing_confirm"; // 声明交换机 @Bean public DirectExchange exchangeConfirm() { return ExchangeBuilder.directExchange(EXCHANGE_CONFIRM).build(); } // 声明队列 @Bean public Queue queueConfirm() { return QueueBuilder.durable(QUEUE_CONFIRM).build(); } // 绑定队列交换机路由key @Bean public Binding queueConfirmBindingExchangeConfirm( @Qualifier("queueConfirm") Queue queueConfirm, @Qualifier("exchangeConfirm") DirectExchange exchangeConfirm ) { return BindingBuilder.bind(queueConfirm).to(exchangeConfirm).with(ROUTING_CONFIRM); } }
MyConfirmCallback
package cn.cnyasin.rabbit.component; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Date; /** * 发布确认高级-发布确认回调组件 * ——@Component注解注册组件,implements RabbitTemplate.ConfirmCallback * ——@Autowired注解自动装配RabbitTemplate * ——@PostConstruct注解把当前类自动注入到全局RabbitTemplate中 * ————注意:@PostConstruct必须在@Autowired之后 * ——重写confirm方法:交换机确认回调(成功失败都回调) * ——重写returnedMessage方法:队列确认回调(仅送达失败时回调) */ @Slf4j @Component public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /** * 交换机确认回调(成功失败都回调) * * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("[*] [{}] 确认回调成功,回调ID:{}", new Date().toString(), correlationData.getId()); } else { log.info("[*] [{}] 确认回调失败,回调ID:{},失败原因:{}", new Date().toString(), correlationData.getId(), cause); } } /** * 队列确认回调(仅送达失败时回调) * * @param returnedMessage */ @Override public void returnedMessage(ReturnedMessage returnedMessage) { int code = returnedMessage.getReplyCode(); String text = returnedMessage.getReplyText(); byte[] message = returnedMessage.getMessage().getBody(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); log.info("[*] [{}] 消息未送达队列回调,错误码code:{}。原因:{}。消息:{}。交换机:{}。路由key:{}", new Date().toString(), code, text, new String(message), exchange, routingKey); } }
MyConfirmConsumer01
package cn.cnyasin.rabbit.consumer; import cn.cnyasin.rabbit.config.MyConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 发布确认高级-消费者01 */ @Slf4j @Component public class MyConfirmConsumer01 { @RabbitListener(queues = MyConfirmConfig.QUEUE_CONFIRM) public void receiveMessage(String message) { log.info("[*] [{}] 发布确认高级-消费者01 接收到消息:{}", new Date().toString(), message); } }
MyConfirmController
package cn.cnyasin.rabbit.controller; import cn.cnyasin.rabbit.config.MyConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * 发布确认高级-控制器(生产者) */ @Slf4j @RestController @RequestMapping("/confirm") public class MyConfirmController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/send/{msg}") public String send(@PathVariable String msg) throws Exception { log.info("[*] [{}] 准备发送消息:{}", new Date().toString(), msg); CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(MyConfirmConfig.EXCHANGE_CONFIRM, MyConfirmConfig.ROUTING_CONFIRM, msg + 1, correlationData); CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(MyConfirmConfig.EXCHANGE_CONFIRM + 2, MyConfirmConfig.ROUTING_CONFIRM, msg + 2, correlationData2); CorrelationData correlationData3 = new CorrelationData("3"); rabbitTemplate.convertAndSend(MyConfirmConfig.EXCHANGE_CONFIRM, MyConfirmConfig.ROUTING_CONFIRM + 3, msg + 3, correlationData3); return "OK"; } }
备注
- 该教程部分内容收集自网络,感谢原作者。
附录
- 无