说明

操作步骤

》安装RabbitMQ

  • 可自行安装,也可参考我的相关教程(CentOS7离线安装RabbitMq),本章假设你已经安装好了RabbitMQ。

》搭建环境

  • idea创建一个空项目
  • 创建一个Maven管理的module
  • pom.xml添加插件:指定JDK编译版本(为了支持lambda表达式,如果不手动添加后期idea报错根据提示会自动添加好)

    <!-- 指定JDK编译版本 -->
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>8</source>
            <target>8</target>
        </configuration>
    </plugin>
  • pom.xml添加依赖:RabbitMQ相关

    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.13.1</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.11.0</version>
    </dependency>
    

》简单案例

  • 说明

    • 我将代码分成三部分:初始化、消费者、生产者。多一层拆分思路更清晰明朗便于理解。
  • 代码组成

    • 初始化类:Initialization

      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      /*
       * 初始化
       *  作用:
       *      创建交换机
       *      创建队列
       *      绑定队列、交换机、路由key
       */
      public class Initialization {
          // 交换机名
          public static final String EXCHANGE_NAME = "exchange01";
      
          // 队列名
          public static final String QUEUE_NAME = "queue01";
      
          // 路由key
          public static final String ROUTING_KEY = "routing01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              // 配置
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 获取连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
      
              connection.close();
      
              System.out.println("初始化成功。。。");
          }
      }
      
    • 消费者类:Consumer

      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.*;
      
      /*
       * 消费者
       *  消费者消费消息流程
       *      建立连接(connection)
       *      获取信道(channel)
       *      消费队列中的消息,自动应答
       *          接收消息回调方法
       *          拒绝消息回调方法
       */
      public class Consumer {
      
          // 队列名
          public static final String QUEUE_NAME = "queue01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 创建连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 消费队列中的消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          // 接收消息回调方法
          public static DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
              System.out.println(" [*] 成功处理消息:" + new String(message.getBody()));
          };
      
          // 拒绝消息回调方法
          public static CancelCallback cancelCallback = (String consumerTag) -> {
              System.out.println("消费消息失败");
          };
      }
    • 生产者类:Producer

      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      /*
       * 生产者
       *  发送消息流程:
       *      建立连接(connection)
       *      获取信道(channel)
       *      通过信道将消息发送到指定交换机(exchange),并绑定路由key(routingKey),路由key可以是多个
       *  注意:
       *      生产者不需要关心队列(queue)
       *      生产者发送消息前需要准备好:
       *          创建相关交换机
       *          创建相关队列
       *          绑定队列、交换机、路由key
       */
      public class Producer {
      
          // 交换机名
          public static final String EXCHANGE_NAME = "exchange01";
      
          // 路由key
          public static final String ROUTING_KEY = "routing01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              // 配置信息
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 创建连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 发送消息
              channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello.".getBytes());
      
              connection.close();
      
              System.out.println("消息发送成功。。。");
          }
      }
  • 运行初始化:Initialization -> main -> run
  • 运行消费者:Consumer -> main -> run
  • 运行生产者:Producer -> main -> run

备注

  • 该教程部分内容收集自网络,感谢原作者。

附录

标签: Java, RabbitMq

添加新评论


手机号仅后台超管可见,普通注册用户以及网站前台全站不可见,请勿担心泄露风险!