RabbitMQ-Java-06-延迟队列
说明
- RabbitMQ-Java-06-延迟队列
- 本案例是一个Maven+SpringBoot项目
- 假设你已经实现了上一节死信队列
- 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
核心概念
》延迟队列说明
实现方式一:
- 基于直接交换机的普通队列,给队列设置TTL
该方式缺点:
- 每个队列的TTL都是固定的,要想不同的TTL只能设置多个TTL,明显不符合我们的需求
实现方式二:
- 基于直接交换机的普通队列,消息发送方设置消息的TTL
该方式缺点:
- 多个不同TTL的消息发送到同一个队列,队列还是会依次执行消息,导致后边的TTL小的消息比他前边的TTL大的消息还要延后执行,明显不符合我们的需求
实现方式三(重点掌握):
- 基于插件:{rabbitmq_delayed_message_exchange},声明交换机的时候指定为{x-delayed-message}类型的交换机
发消息的时候设置好消息特殊消息头:{x-delay}
- 文档及下载地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
安装
- 将下载的插件拷贝至RabbitMq插件目录(默认:/usr/lib/rabbitmq/plugins),如果没有就新建之
开启插件:
[admin@host001 plugins]$ pwd /usr/lib/rabbitmq/plugins # 重命名 [admin@host001 plugins]$ sudo mv rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq_delayed_message_exchange.ez [admin@host001 plugins]$ ll total 36 -rw-r--r--. 1 root root 36358 Dec 26 08:52 rabbitmq_delayed_message_exchange.ez # 开启 [admin@host001 plugins]$ sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange Enabling plugins on node rabbit@host001: rabbitmq_delayed_message_exchange The following plugins have been configured: rabbitmq_delayed_message_exchange rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@host001... The following plugins have been enabled: rabbitmq_delayed_message_exchange started 1 plugins. # 重启RabbitMq [admin@host001 plugins]$ sudo service rabbitmq-server restart Redirecting to /bin/systemctl restart rabbitmq-server.service # 重新RabbitMq登录后台,新建交换机Type选项多了:x-delayed-message,表示插件开启成功
该方式的优点:
- 同一个队列可以接收任意delay的消息,处理消息的时候会根据消息的delay时间自动处理排序,nice~
》延迟队列实现过程概览
- idea新建SpringBoot类型module
添加Maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version> </dependency>
在默认配置文件中添加RabbitMq配置项
spring.rabbitmq.host=192.168.3.202 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456
- 开始搞事情
操作步骤
》实现方式一
延迟队列自动配置类:DelayQueueConfig
package cn.cnyasin.rabbit.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * TTL队列自动配置类 */ @Configuration public class DelayQueueConfig { // 定义交换机名 public static final String EXCHANGE_NORMAL = "exchange_normal"; public static final String EXCHANGE_DEAD = "exchange_dead"; // 定义队列名 public static final String QUEUE_NORMAL_1 = "queue_normal_1"; public static final String QUEUE_NORMAL_2 = "queue_normal_2"; public static final String QUEUE_DEAD = "queue_dead"; // 定义路由key public static final String ROUTING_NORMAL_1 = "routing_normal_1"; public static final String ROUTING_NORMAL_2 = "routing_normal_2"; public static final String ROUTING_DEAD = "routing_dead"; // 声明交换机 @Bean public DirectExchange ExchangeNormal() { return new DirectExchange(EXCHANGE_NORMAL); } @Bean public DirectExchange ExchangeDead() { return new DirectExchange(EXCHANGE_DEAD); } // 声明队列 @Bean public Queue QueueNormal1() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD); arguments.put("x-dead-letter-routing-key", ROUTING_DEAD); arguments.put("x-message-ttl", 10000); return new Queue(QUEUE_NORMAL_1, true, false, false, arguments); } @Bean public Queue QueueNormal2() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD); arguments.put("x-dead-letter-routing-key", ROUTING_DEAD); arguments.put("x-message-ttl", 30000); return new Queue(QUEUE_NORMAL_2, true, false, false, arguments); } @Bean public Queue QueueDead() { return new Queue(QUEUE_DEAD, true, false, false, null); } // 绑定队列、交换机、路由key @Bean public Binding QueueNormal1BindingExchangeNormal( @Qualifier("QueueNormal1") Queue queueNormal1, @Qualifier("ExchangeNormal") Exchange exchangeNormal ) { return BindingBuilder.bind(queueNormal1).to(exchangeNormal).with(ROUTING_NORMAL_1).noargs(); } @Bean public Binding QueueNormal2BindingExchangeNormal( @Qualifier("QueueNormal2") Queue queueNormal2, @Qualifier("ExchangeNormal") Exchange exchangeNormal ) { return BindingBuilder.bind(queueNormal2).to(exchangeNormal).with(ROUTING_NORMAL_2).noargs(); } @Bean public Binding QueueDeadBindingExchangeDead( @Qualifier("QueueDead") Queue queueDead, @Qualifier("ExchangeDead") Exchange exchangeDead ) { return BindingBuilder.bind(queueDead).to(exchangeDead).with(ROUTING_DEAD).noargs(); } }
延迟队列消费者组件:DelayQueueConsumer
package cn.cnyasin.rabbit.consumer; import cn.cnyasin.rabbit.config.DelayQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 延迟队列消费者组件 */ @Slf4j @Component public class DelayQueueConsumer { /** * 队列TTL-消费者 * * @param data */ @RabbitListener(queues = DelayQueueConfig.QUEUE_DEAD) public void ddlQueueConsumer(String data) { log.info("[*] [{}] 死信队列接收到消息:{}", new Date().toString(), data); } }
延迟队列控制器:DelayQueueController
package cn.cnyasin.rabbit.controller; import cn.cnyasin.rabbit.config.DelayQueueConfig; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 延迟队列控制器 */ @Slf4j @RestController @RequestMapping("/delay/queue") public class DelayQueueController { @Autowired RabbitTemplate rabbitTemplate; /** * 队列TTL-生产者 * * @param msg * @return */ @RequestMapping("/ttl/queue/producer/{msg}") public String TtlQueueProducer(@PathVariable String msg) { log.info("[*] 准备发送消息:{}", msg); rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_1, msg); rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_2, msg); return JSON.toJSONString("消息发送成功"); } }
》实现方式二
延迟队列自动配置类:DelayQueueConfig(代码基于实现方式一,以下是新增的部分)
// 定义队列名 public static final String QUEUE_NORMAL_3 = "queue_normal_3"; // 定义路由key public static final String ROUTING_NORMAL_3 = "routing_normal_3"; @Bean public Queue QueueNormal3() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD); arguments.put("x-dead-letter-routing-key", ROUTING_DEAD); return new Queue(QUEUE_NORMAL_3, true, false, false, arguments); } @Bean public Binding QueueNormal3BindingExchangeNormal( @Qualifier("QueueNormal3") Queue queueNormal3, @Qualifier("ExchangeNormal") Exchange exchangeNormal ) { return BindingBuilder.bind(queueNormal3).to(exchangeNormal).with(ROUTING_NORMAL_3).noargs(); }
延迟队列控制器:DelayQueueController(代码基于实现方式一,以下是新增的部分)
/** * 消息TTL-生产者 * * @param msg * @return */ @RequestMapping("/ttl/message/producer/{msg}/{ttl}") public String TtlMessageProducer(@PathVariable String msg, @PathVariable int ttl) throws Exception { log.info("[*] [{}]准备发送消息:{}", new Date().toString(), msg); Message message = MessageBuilder.withBody(msg.getBytes("UTF-8")) .setExpiration(String.valueOf(ttl)) .build(); rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_3, message); return JSON.toJSONString("消息发送成功"); }
》实现方式三
延迟队列自动配置类:DelayQueueConfig(代码基于实现方式一,以下是新增的部分)
// 定义交换机名 public static final String EXCHANGE_DELAY = "exchange_delay"; // 定义队列名 public static final String QUEUE_DELAY = "queue_delay"; // 定义路由key public static final String ROUTING_DELAY = "routing_delay"; // 声明交换机 @Bean public CustomExchange ExchangeDelay() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); return new CustomExchange(EXCHANGE_DELAY, "x-delayed-message", true, false, arguments); } // 声明队列 @Bean public Queue QueueDelay() { return new Queue(QUEUE_DELAY, true, false, false, null); } // 绑定队列、交换机、路由key @Bean public Binding QueueDelayBindingExchangeDelay( @Qualifier("QueueDelay") Queue queueDelay, @Qualifier("ExchangeDelay") Exchange exchangeDelay ) { return BindingBuilder.bind(queueDelay).to(exchangeDelay).with(ROUTING_DELAY).noargs(); }
延迟队列消费者组件:DelayQueueConsumer(代码基于实现方式一,以下是新增的部分)
/** * delay队列-消费者 * * @param data */ @RabbitListener(queues = DelayQueueConfig.QUEUE_DELAY) public void delayQueueConsumer(String data) { log.info("[*] [{}] 延迟队列接收到消息:{}", new Date().toString(), data); }
延迟队列控制器:DelayQueueController(代码基于实现方式一,以下是新增的部分)
/** * delay消息-生产者 * * @param msg * @return */ @RequestMapping("/delay/message/producer/{msg}/{ttl}") public String DelayMessageProducer(@PathVariable String msg, @PathVariable int ttl) throws Exception { log.info("[*] [{}]准备发送消息:{}", new Date().toString(), msg); Message message = MessageBuilder.withBody(msg.getBytes("UTF-8")) .setHeader("x-delay", String.valueOf(ttl)) .build(); rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_DELAY, DelayQueueConfig.ROUTING_DELAY, message); return JSON.toJSONString("消息发送成功"); }
备注
- 该教程部分内容收集自网络,感谢原作者。
附录
- 无