RabbitMQ-Java-02-工作队列
说明
- RabbitMQ-Java-02-工作队列
- 本案例是一个Maven项目
- 假设你已经实现了上一节简单队列
- 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
核心概念
》原理
- 执行资源密集型任务时往往有多个队列,每个队列有多个工作线程去处理
- 注意:一个消息必须保证只能被处理一次
操作步骤
》搭建环境
- idea创建一个空项目
- 创建一个Maven管理的module
pom.xml添加插件:指定JDK编译版本(为了支持lambda表达式,如果不手动添加后期idea报错根据提示会自动添加好)
<!-- 指定JDK编译版本 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin>
pom.xml添加依赖:RabbitMQ相关
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.13.1</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-io/commons-io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.11.0</version> </dependency>
》工作队列案例(自动应答)
说明
- 主要是两步:提取工具类、分多线程同时处理一个队列
代码组成
RabbitMQ工具类:RabbitMqUtils
package cn.cnyasin.rabbit.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMqUtils { public static Channel getChannel() throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 配置 factory.setHost("192.168.3.202"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 获取连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); return channel; } }
初始化:Init
package cn.cnyasin.rabbit.worker; import cn.cnyasin.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; public class Init { // 交换机名 public static final String EXCHANGE_NAME = "exchange01"; // 队列名 public static final String QUEUE_NAME = "queue01"; // 路由key public static final String ROUTING_KEY = "routing01"; public static void main(String[] args) throws Exception { // 获取信道 Channel channel = RabbitMqUtils.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定队列、交换机、路由key channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); System.out.println("初始化成功。。。"); System.exit(0); } }
工作队列01(线程一):Worker01
package cn.cnyasin.rabbit.worker; import cn.cnyasin.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Delivery; /** * 工作队列01(消费者) */ public class Worker01 { // 定义本工作队列要处理的队列名字 public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws Exception { // 获取信道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("[*] 工作队列01正在等待接收消息。。。"); // 处理消息 channel.basicConsume( // 队列名 QUEUE_NAME, // 自动应答 true, // 成功处理消息回调 (String consumerTag, Delivery message) -> { // TODO 业务逻辑代码在这里 System.out.println(" [*] 成功处理消息:" + new String(message.getBody())); }, // 处理消息失败回调 (String consumerTag) -> { System.out.println("处理消息失败"); } ); } }
工作队列02(线程二):Worker02
- 方式一:将上面Worker01代码复制一份,名字改为Worker02,然后运行
方式二:idea支持同一个run方法并行运行:
- idea | Run | Edit Configurations | Allow parallel run
生产者:Task
package cn.cnyasin.rabbit.worker; import cn.cnyasin.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.util.Scanner; public class Task { // 交换机名 public static final String EXCHANGE_NAME = "exchange01"; // 路由key public static final String ROUTING_KEY = "routing01"; public static void main(String[] args) throws Exception { // 获取信道 Channel channel = RabbitMqUtils.getChannel(); // 接收控制台输入 Scanner scanner = new Scanner(System.in); System.out.println("[*] 等待控制台输入消息内容。。。"); while (scanner.hasNext()) { String input = scanner.next(); if (input.equals("exit")) { break; } // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes()); System.out.println("[*] 消息发送成功。。。"); } channel.close(); System.out.println("[*] 用户退出。。。"); System.exit(0); } }
运行初始化:
- Init -> main -> run
开启两个工作队列:
- Worker01 -> main -> run
- Worker02 -> main -> run
运行生产者:
- Task -> main -> run
》工作队列案例(手动应答与自动重新入队)
说明
- 为了防止消息丢失,往往都使用手动应答机制
- 如果消息处理失败,自动重新入队
- 代码还是基于上一节:工作队列案例(自动应答)
代码组成
RabbitMQ工具类:RabbitMqUtils
- 代码同:工作队列案例(自动应答)
初始化:Init
- 代码同:工作队列案例(自动应答)
工作队列01(线程一):Worker01
package cn.cnyasin.rabbit.ack; import cn.cnyasin.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Delivery; /** * 工作队列01(消费者) */ public class Worker01 { // 定义本工作队列要处理的队列名字 public static final String QUEUE_NAME = "queue_ack"; public static void main(String[] args) throws Exception { // 获取信道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("[*] 工作队列01正在等待接收消息。。。"); // 处理消息 channel.basicConsume( // 队列名 QUEUE_NAME, // 自动应答 false, // 成功处理消息回调 (String consumerTag, Delivery message) -> { try { // 沉睡1秒 Thread.sleep(1000 * 1); } catch (InterruptedException e) { e.printStackTrace(); } // TODO 业务逻辑代码在这里 System.out.println(" [*] 工作队列01成功处理消息:" + new String(message.getBody())); // TODO 手动应答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }, // 处理消息失败回调 (String consumerTag) -> { System.out.println("处理消息失败"); } ); } }
工作队列02(线程二):Worker02
package cn.cnyasin.rabbit.ack; import cn.cnyasin.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Delivery; /** * 工作队列02(消费者) */ public class Worker02 { // 定义本工作队列要处理的队列名字 public static final String QUEUE_NAME = "queue_ack"; public static void main(String[] args) throws Exception { // 获取信道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("[*] 工作队列02正在等待接收消息。。。"); // 处理消息 channel.basicConsume( // 队列名 QUEUE_NAME, // 自动应答 true, // 成功处理消息回调 (String consumerTag, Delivery message) -> { try { // 沉睡10秒 Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } // TODO 业务逻辑代码在这里 System.out.println(" [*] 工作队列02成功处理消息:" + new String(message.getBody())); // TODO 手动应答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }, // 处理消息失败回调 (String consumerTag) -> { System.out.println("处理消息失败"); } ); } }
生产者:Task
- 代码同:工作队列案例(自动应答)
运行初始化:
- Init -> main -> run
开启两个工作队列:
- Worker01 -> main -> run
- Worker02 -> main -> run
运行生产者:
- Task -> main -> run
》消息持久化
说明
代码借用上一节代码,在此基础上只需要在生产者(Task)代码中进行以下改动
- 发送消息的时候,第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN,即可使消息持久化存储,即使RabbitMQ宕机消息也不会丢失。
核心代码
// 消息持久化 AMQP.BasicProperties properties = MessageProperties.PERSISTENT_TEXT_PLAIN; // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, input.getBytes());
》不公平分发
说明
代码借用上一节代码,在此基础上只需要在消费者(Worker01、Worker02)代码中进行以下改动
- 处理消息之前,信道设置一下:channel.basicQos()
- 该函数接收一个int类型值(0=公平分发,大于0=不公平分发,默认0),该值的意思是预取值,代表该工作线程一次取的消息数量
核心代码
// 设置不公平分发 int prefetchCount = 1; // 预取值 channel.basicQos(prefetchCount);
备注
- 该教程部分内容收集自网络,感谢原作者。
附录
- 无