说明

  • RabbitMQ-PHP入门
  • 环境:CentOS7+PHP7.3

操作步骤

》安装RabbitMQ

  • 可自行安装,也可参考我的相关教程(CentOS7离线安装RabbitMq),本章假设你已经安装好了RabbitMQ。

》PHP安装AMQP扩展

  • 下载地址:https://pecl.php.net/package/amqp
  • 大小:110k+-

    amqp-1.10.2.tgz
  • 安装AMQP扩展需要先安装rabbitmq-c,否则安装不上

  • 将下载好的压缩包放入CentOS7能访问到的目录

    [admin@192 src]$ ll
    -rw-r--r-- 1 501 games   107350 Dec 19 23:13 amqp-1.10.2.tgz
    -rw-r--r-- 1 501 games   145361 Dec 19 23:51 rabbitmq-c-0.10.0.tar.gz
  • 安装rabbitmq-c

    [admin@192 src]$ tar zxvf rabbitmq-c-0.10.0.tar.gz
    [admin@192 src]$ cd rabbitmq-c-0.10.0
    [admin@192 rabbitmq-c-0.10.0]$ cmake . -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c-0.10.0
    [admin@192 rabbitmq-c-0.10.0]$ make
    # 非root这一步需要加sudo
    [admin@192 rabbitmq-c-0.10.0]$ sudo make install
    
  • 安装AMQP

    [admin@192 src]$ tar zxvf amqp-1.10.2.tgz
    [admin@192 src]$ cd amqp-1.10.2
    
    # 生成编译文件
    [admin@192 amqp-1.10.2]$ phpize
    Configuring for:
    PHP Api Version:         20180731
    Zend Module Api No:      20180731
    Zend Extension Api No:   320180731
    
    # 查看本机php-config位置
    [admin@192 amqp-1.10.2]$ sudo find / -name php-config
    /usr/local/php/bin/php-config
    
    # 配置
    [admin@192 amqp-1.10.2]$ ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.10.0
    
    # make
    [admin@192 amqp-1.10.2]$ make
    
    # make 报错
    /usr/bin/ld: cannot find -lrabbitmq
    collect2: error: ld returned 1 exit status
    make: *** [amqp.la] Error 1
    
    # 复制/usr/local/rabbitmq-c-0.10.0/lib64一份到/usr/local/rabbitmq-c-0.10.0/lib
    [admin@192 amqp-1.10.2]$ cd /usr/local/rabbitmq-c-0.10.0/
    [admin@192 rabbitmq-c-0.10.0]$ ll
    total 0
    drwxr-xr-x 2 root root  92 Dec 20 00:54 include
    drwxr-xr-x 3 root root 118 Dec 20 00:54 lib64
    [admin@192 rabbitmq-c-0.10.0]$ sudo cp -R ./lib64 ./lib
    
    # 返回amqp-1.10.2的解压缩目录继续make
    [admin@192 amqp-1.10.2]$ make
    ----------------------------------------------------------------------
    Libraries have been installed in:
       /usr/local/src/RabbitMQ/amqp-1.10.2/modules
    
    If you ever happen to want to link against installed libraries
    in a given directory, LIBDIR, you must either use libtool, and
    specify the full pathname of the library, or use the `-LLIBDIR'
    flag during linking and do at least one of the following:
       - add LIBDIR to the `LD_LIBRARY_PATH' environment variable
         during execution
       - add LIBDIR to the `LD_RUN_PATH' environment variable
         during linking
       - use the `-Wl,--rpath -Wl,LIBDIR' linker flag
       - have your system administrator add LIBDIR to `/etc/ld.so.conf'
    
    See any operating system documentation about shared libraries for
    more information, such as the ld(1) and ld.so(8) manual pages.
    ----------------------------------------------------------------------
    
    Build complete.
    Don't forget to run 'make test'.
    
    
    # make install,非root这一步需要加sudo
    [admin@192 amqp-1.10.2]$ sudo make install
    Installing shared extensions:     /usr/local/php/lib/php/extensions/no-debug-non-zts-20180731/
    
    # 查看扩展
    [admin@192 amqp-1.10.2]$ ll /usr/local/php/lib/php/extensions/no-debug-non-zts-20180731/
    total 9672
    -rwxr-xr-x  1 root root  656864 Dec 20 01:03 amqp.so
    
    # php.ini开启扩展
    [admin@192 amqp-1.10.2]$ sudo find / -name php.ini
    /usr/local/php/etc/php.ini
    [admin@192 amqp-1.10.2]$ sudo vi /usr/local/php/etc/php.ini
    extension=amqp
    [admin@192 amqp-1.10.2]$ php -m
    [PHP Modules]
    amqp
    
    # 重启php-fpm
    [admin@192 amqp-1.10.2]$ sudo lnmp php-fpm restart
    

》简单PHP案例

  • 说明

    • 纯原声PHP代码实现方式,不借助三方包,只要PHP安装好amqp扩展即可。
  • 代码组成

    • 消费者代码:RabbitmqConsumer.php

      <?php
      
      //配置信息
      $conf = [
          'host' => '192.168.3.202',
          'port' => '5672',
          'login' => 'admin',
          'password' => '123456',
          'vhost' => '/'
      ];
      
      // 交换机名字
      $exchangeName = 'exchange001';
      
      // 路由key名字
      $routingKey = 'route001';
      
      // 队列名字
      $queueName001 = 'exchange001-queue001';
      $queueName002 = 'exchange001-queue002';
      
      // 创建connection
      $connection = new \AMQPConnection($conf);
      $connection->connect();
      
      // 创建channel
      $channel = new \AMQPChannel($connection);
      
      // 初始化一个交换机并创建
      $exchange = new \AMQPExchange($channel);
      $exchange->setName($exchangeName);          // 设置交换机名字
      $exchange->setType(AMQP_EX_TYPE_DIRECT);    // 设置类型:直接交换类型
      $exchange->setFlags(AMQP_DURABLE);          // 设置标识:持久交换机
      $res = $exchange->declareExchange();        // 声明交换机
      
      // 创建队列001
      $queue001 = new \AMQPQueue($channel);
      $queue001->setName($queueName001);
      $queue001->setFlags(AMQP_DURABLE);
      $res = $queue001->declareQueue();
      
      // 队列001绑定到交换机,并指定路由key
      $res = $queue001->bind($exchangeName, $routingKey);
      
      // 消费消息
      echo "正在接收消息:\n";
      // consume方法:是一个阻塞方法,有数据执行,没数据则一直阻塞状态
      // $queue->consume('callbackFunction', AMQP_AUTOACK);  // 自动应答
      $queue001->consume('callbackFunction');
      
      // get方法:是consume的非阻塞替代方法
      // $msg = $queue->get(AMQP_AUTOACK);
      
      // 创建队列002
      /**
       * 注意:由于consume方法是阻塞模式,所以之后的代码就不再执行了,故而队列002的代码不会执行
       */
      /*
      $queue002 = new \AMQPQueue($channel);
      $queue002->setName($queueName002);
      $queue002->setFlags(AMQP_DURABLE);
      $res = $queue002->declareQueue();
      
      // 队列002绑定到交换机,并指定路由key
      $res = $queue002->bind($exchangeName, $routingKey);
      
      // 消费消息
      $queue002->consume('callbackFunction');
      */
      
      // 消费消息回调方法
      function callbackFunction(\AMQPEnvelope $envelope, \AMQPQueue $queue)
      {
          $msg = $envelope->getBody();
          // 业务逻辑
          echo '[' . date('YmdHis') . '] 队列:' . $queue->getName() . ' 成功处理了一条消息:' . $msg . "\n"; //处理消息
          $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
      }
    • 生产者代码:RabbitmqProducer.php

      <?php
      
      //配置信息
      $conf = [
          'host' => '192.168.3.202',
          'port' => '5672',
          'login' => 'admin',
          'password' => '123456',
          'vhost' => '/'
      ];
      
      // 交换机名字
      $exchangeName = 'exchange001';
      
      // 路由key名字
      $routingKey = 'route001';
      
      // 创建connection
      $connection = new \AMQPConnection($conf);
      $connection->connect();
      
      // 创建channel
      $channel = new \AMQPChannel($connection);
      
      // 创建交换机
      $exchange = new \AMQPExchange($channel);
      $exchange->setName($exchangeName);          // 设置交换机名字
      
      // 开启事务
      //$channel->startTransaction();
      
      // 发送消息
      for ($i = 0; $i < 3; $i++) {
          $msg = 'msg' . $i;
          $exchange->publish($msg, $routingKey);
          echo "发送消息:[$msg] 成功\n";
          sleep(1);
      }
      
      // 提交事务
      //$channel->commitTransaction();
      
      // 关闭connection
      $connection->disconnect();
      
      echo "全部消息发送完成,程序退出。\n";
  • 运行消费者

    [admin@192 test]$ php RabbitmqConsumer.php
    正在接收消息:
    
  • 运行生产者

    [admin@192 test]$ php RabbitmqProducer.php

》升级版PHP案例

  • 说明

  • 代码组成

    • composer.phar

    • composer.json

      "require": {
          "php-amqplib/php-amqplib": ">=3.0"
        }
    • 消费者代码:Receive.php

      <?php
      
      require_once __DIR__ . '/../../../vendor/autoload.php';
      
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      
      $connection = new AMQPStreamConnection('192.168.3.202', 5672, 'admin', '123456');
      $channel = $connection->channel();
      
      // 声明一个交换机
      $channel->exchange_declare('exchange01', AMQP_EX_TYPE_DIRECT, false, true, false);
      
      // 声明一个队列
      $channel->queue_declare('queue01', false, true, false, true);
      $channel->queue_declare('queue02', false, true, false, true);
      
      // 绑定队列和交换机并指定路由key
      $channel->queue_bind('queue01', 'exchange01', 'routing01');
      $channel->queue_bind('queue02', 'exchange01', 'routing01');
      
      echo " [*] Waiting for messages. To exit press CTRL+C\n";
      
      $callback = function (\PhpAmqpLib\Message\AMQPMessage $msg) {
          echo ' [x] Received ', $msg->body, "\n";
      };
      
      $channel->basic_consume('queue01', '', false, true, false, false, $callback);
      $channel->basic_consume('queue02', '', false, true, false, false, $callback);
      
      while ($channel->is_open()) {
          $channel->wait();
      }
    • 生产者代码:Send.php

      <?php
      
      require_once __DIR__ . '/../../../vendor/autoload.php';
      
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
      
      $connection = new AMQPStreamConnection('192.168.3.202', 5672, 'admin', '123456');
      $channel = $connection->channel();
      
      $channel->queue_declare('queue01', false, true, false, true);
      
      $msg = new AMQPMessage('Hello World!');
      $channel->basic_publish($msg, 'exchange01', 'routing01');
      
      echo " [x] Sent 'Hello World!'\n";
      
      $channel->close();
      $connection->close();
  • 运行消费者

    [admin@192 Rabbit]$ php Receive.php 
    [*] Waiting for messages. To exit press CTRL+C  
    [x] Received Hello World!
  • 运行生产者

    [admin@192 Rabbit]$ php Send.php 
    [x] Sent 'Hello World!' 

备注

  • 该教程部分内容收集自网络,感谢原作者。

附录

标签: PHP, RabbitMq

添加新评论


手机号仅后台超管可见,普通注册用户以及网站前台全站不可见,请勿担心泄露风险!