说明

核心概念

》延迟队列说明

  • 实现方式一:

    • 基于直接交换机的普通队列,给队列设置TTL
    • 该方式缺点:

      • 每个队列的TTL都是固定的,要想不同的TTL只能设置多个TTL,明显不符合我们的需求
  • 实现方式二:

    • 基于直接交换机的普通队列,消息发送方设置消息的TTL
    • 该方式缺点:

      • 多个不同TTL的消息发送到同一个队列,队列还是会依次执行消息,导致后边的TTL小的消息比他前边的TTL大的消息还要延后执行,明显不符合我们的需求
  • 实现方式三(重点掌握)

    • 基于插件:{rabbitmq_delayed_message_exchange},声明交换机的时候指定为{x-delayed-message}类型的交换机
    • 发消息的时候设置好消息特殊消息头:{x-delay}

      • 文档及下载地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
      • 安装

        • 将下载的插件拷贝至RabbitMq插件目录(默认:/usr/lib/rabbitmq/plugins),如果没有就新建之
        • 开启插件:

          [admin@host001 plugins]$ pwd
          /usr/lib/rabbitmq/plugins
          
          # 重命名
          [admin@host001 plugins]$ sudo mv rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq_delayed_message_exchange.ez
          [admin@host001 plugins]$ ll
          total 36
          -rw-r--r--. 1 root root 36358 Dec 26 08:52 rabbitmq_delayed_message_exchange.ez
          
          # 开启
          [admin@host001 plugins]$ sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
          Enabling plugins on node rabbit@host001:
          rabbitmq_delayed_message_exchange
          The following plugins have been configured:
            rabbitmq_delayed_message_exchange
            rabbitmq_management
            rabbitmq_management_agent
            rabbitmq_web_dispatch
          Applying plugin configuration to rabbit@host001...
          The following plugins have been enabled:
            rabbitmq_delayed_message_exchange
          
          started 1 plugins.
          
          # 重启RabbitMq
          [admin@host001 plugins]$ sudo service rabbitmq-server restart
          Redirecting to /bin/systemctl restart rabbitmq-server.service
          
          # 重新RabbitMq登录后台,新建交换机Type选项多了:x-delayed-message,表示插件开启成功
          
    • 该方式的优点:

      • 同一个队列可以接收任意delay的消息,处理消息的时候会根据消息的delay时间自动处理排序,nice~

》延迟队列实现过程概览

  • idea新建SpringBoot类型module
  • 添加Maven依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.76</version>
    </dependency>
    
  • 在默认配置文件中添加RabbitMq配置项

    spring.rabbitmq.host=192.168.3.202
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    
  • 开始搞事情

操作步骤

》实现方式一

  • 延迟队列自动配置类:DelayQueueConfig

    package cn.cnyasin.rabbit.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * TTL队列自动配置类
     */
    @Configuration
    public class DelayQueueConfig {
        // 定义交换机名
        public static final String EXCHANGE_NORMAL = "exchange_normal";
        public static final String EXCHANGE_DEAD = "exchange_dead";
    
        // 定义队列名
        public static final String QUEUE_NORMAL_1 = "queue_normal_1";
        public static final String QUEUE_NORMAL_2 = "queue_normal_2";
        public static final String QUEUE_DEAD = "queue_dead";
    
        // 定义路由key
        public static final String ROUTING_NORMAL_1 = "routing_normal_1";
        public static final String ROUTING_NORMAL_2 = "routing_normal_2";
        public static final String ROUTING_DEAD = "routing_dead";
    
        // 声明交换机
        @Bean
        public DirectExchange ExchangeNormal() {
            return new DirectExchange(EXCHANGE_NORMAL);
        }
    
        @Bean
        public DirectExchange ExchangeDead() {
            return new DirectExchange(EXCHANGE_DEAD);
        }
    
        // 声明队列
        @Bean
        public Queue QueueNormal1() {
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
            arguments.put("x-dead-letter-routing-key", ROUTING_DEAD);
            arguments.put("x-message-ttl", 10000);
            return new Queue(QUEUE_NORMAL_1, true, false, false, arguments);
        }
    
        @Bean
        public Queue QueueNormal2() {
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
            arguments.put("x-dead-letter-routing-key", ROUTING_DEAD);
            arguments.put("x-message-ttl", 30000);
            return new Queue(QUEUE_NORMAL_2, true, false, false, arguments);
        }
    
        @Bean
        public Queue QueueDead() {
            return new Queue(QUEUE_DEAD, true, false, false, null);
        }
    
        // 绑定队列、交换机、路由key
        @Bean
        public Binding QueueNormal1BindingExchangeNormal(
                @Qualifier("QueueNormal1") Queue queueNormal1,
                @Qualifier("ExchangeNormal") Exchange exchangeNormal
        ) {
            return BindingBuilder.bind(queueNormal1).to(exchangeNormal).with(ROUTING_NORMAL_1).noargs();
        }
    
        @Bean
        public Binding QueueNormal2BindingExchangeNormal(
                @Qualifier("QueueNormal2") Queue queueNormal2,
                @Qualifier("ExchangeNormal") Exchange exchangeNormal
        ) {
            return BindingBuilder.bind(queueNormal2).to(exchangeNormal).with(ROUTING_NORMAL_2).noargs();
        }
    
        @Bean
        public Binding QueueDeadBindingExchangeDead(
                @Qualifier("QueueDead") Queue queueDead,
                @Qualifier("ExchangeDead") Exchange exchangeDead
        ) {
            return BindingBuilder.bind(queueDead).to(exchangeDead).with(ROUTING_DEAD).noargs();
        }
    
    }
    
  • 延迟队列消费者组件:DelayQueueConsumer

    package cn.cnyasin.rabbit.consumer;
    
    import cn.cnyasin.rabbit.config.DelayQueueConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * 延迟队列消费者组件
     */
    @Slf4j
    @Component
    public class DelayQueueConsumer {
        /**
         * 队列TTL-消费者
         *
         * @param data
         */
        @RabbitListener(queues = DelayQueueConfig.QUEUE_DEAD)
        public void ddlQueueConsumer(String data) {
            log.info("[*] [{}] 死信队列接收到消息:{}", new Date().toString(), data);
        }
    }
    
  • 延迟队列控制器:DelayQueueController

    package cn.cnyasin.rabbit.controller;
    
    import cn.cnyasin.rabbit.config.DelayQueueConfig;
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * 延迟队列控制器
     */
    @Slf4j
    @RestController
    @RequestMapping("/delay/queue")
    public class DelayQueueController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * 队列TTL-生产者
         *
         * @param msg
         * @return
         */
        @RequestMapping("/ttl/queue/producer/{msg}")
        public String TtlQueueProducer(@PathVariable String msg) {
            log.info("[*] 准备发送消息:{}", msg);
    
            rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_1, msg);
            rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_2, msg);
    
            return JSON.toJSONString("消息发送成功");
        }
    }
    

》实现方式二

  • 延迟队列自动配置类:DelayQueueConfig(代码基于实现方式一,以下是新增的部分)

    // 定义队列名
    public static final String QUEUE_NORMAL_3 = "queue_normal_3";
    
    // 定义路由key
    public static final String ROUTING_NORMAL_3 = "routing_normal_3";
    
    
    @Bean
    public Queue QueueNormal3() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
        arguments.put("x-dead-letter-routing-key", ROUTING_DEAD);
        return new Queue(QUEUE_NORMAL_3, true, false, false, arguments);
    }
    
    @Bean
    public Binding QueueNormal3BindingExchangeNormal(
        @Qualifier("QueueNormal3") Queue queueNormal3,
        @Qualifier("ExchangeNormal") Exchange exchangeNormal
    ) {
        return BindingBuilder.bind(queueNormal3).to(exchangeNormal).with(ROUTING_NORMAL_3).noargs();
    }
  • 延迟队列控制器:DelayQueueController(代码基于实现方式一,以下是新增的部分)

    /**
     * 消息TTL-生产者
     *
     * @param msg
     * @return
     */
    @RequestMapping("/ttl/message/producer/{msg}/{ttl}")
    public String TtlMessageProducer(@PathVariable String msg, @PathVariable int ttl) throws Exception {
        log.info("[*] [{}]准备发送消息:{}", new Date().toString(), msg);
    
        Message message = MessageBuilder.withBody(msg.getBytes("UTF-8"))
            .setExpiration(String.valueOf(ttl))
            .build();
    
        rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_NORMAL, DelayQueueConfig.ROUTING_NORMAL_3, message);
    
        return JSON.toJSONString("消息发送成功");
    }

》实现方式三

  • 延迟队列自动配置类:DelayQueueConfig(代码基于实现方式一,以下是新增的部分)

    // 定义交换机名
    public static final String EXCHANGE_DELAY = "exchange_delay";
    // 定义队列名
    public static final String QUEUE_DELAY = "queue_delay";
    // 定义路由key
    public static final String ROUTING_DELAY = "routing_delay";
    
    // 声明交换机
    @Bean
    public CustomExchange ExchangeDelay() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_DELAY, "x-delayed-message", true, false, arguments);
    }
    
    // 声明队列
    @Bean
    public Queue QueueDelay() {
        return new Queue(QUEUE_DELAY, true, false, false, null);
    }
    
    // 绑定队列、交换机、路由key
    @Bean
    public Binding QueueDelayBindingExchangeDelay(
        @Qualifier("QueueDelay") Queue queueDelay,
        @Qualifier("ExchangeDelay") Exchange exchangeDelay
    ) {
        return BindingBuilder.bind(queueDelay).to(exchangeDelay).with(ROUTING_DELAY).noargs();
    }
    
  • 延迟队列消费者组件:DelayQueueConsumer(代码基于实现方式一,以下是新增的部分)

    /**
     * delay队列-消费者
     *
     * @param data
     */
    @RabbitListener(queues = DelayQueueConfig.QUEUE_DELAY)
    public void delayQueueConsumer(String data) {
        log.info("[*] [{}] 延迟队列接收到消息:{}", new Date().toString(), data);
    }
    
  • 延迟队列控制器:DelayQueueController(代码基于实现方式一,以下是新增的部分)

    /**
     * delay消息-生产者
     *
     * @param msg
     * @return
     */
    @RequestMapping("/delay/message/producer/{msg}/{ttl}")
    public String DelayMessageProducer(@PathVariable String msg, @PathVariable int ttl) throws Exception {
        log.info("[*] [{}]准备发送消息:{}", new Date().toString(), msg);
    
        Message message = MessageBuilder.withBody(msg.getBytes("UTF-8"))
                .setHeader("x-delay", String.valueOf(ttl))
                .build();
    
        rabbitTemplate.convertAndSend(DelayQueueConfig.EXCHANGE_DELAY, DelayQueueConfig.ROUTING_DELAY, message);
    
        return JSON.toJSONString("消息发送成功");
    }
    

备注

  • 该教程部分内容收集自网络,感谢原作者。

附录

标签: Java, RabbitMq

添加新评论


手机号仅后台超管可见,普通注册用户以及网站前台全站不可见,请勿担心泄露风险!