PHP-RabbitMQ-PHP入门
说明
- RabbitMQ-PHP入门
- 环境:CentOS7+PHP7.3
操作步骤
》安装RabbitMQ
- 可自行安装,也可参考我的相关教程(CentOS7离线安装RabbitMq),本章假设你已经安装好了RabbitMQ。
》PHP安装AMQP扩展
- 下载地址:https://pecl.php.net/package/amqp
大小:110k+-
amqp-1.10.2.tgz
安装AMQP扩展需要先安装rabbitmq-c,否则安装不上
- 下载地址:https://github.com/alanxz/rabbitmq-c/releases
大小:150k+-
rabbitmq-c-0.10.0.tar.gz
将下载好的压缩包放入CentOS7能访问到的目录
[admin@192 src]$ ll -rw-r--r-- 1 501 games 107350 Dec 19 23:13 amqp-1.10.2.tgz -rw-r--r-- 1 501 games 145361 Dec 19 23:51 rabbitmq-c-0.10.0.tar.gz
安装rabbitmq-c
[admin@192 src]$ tar zxvf rabbitmq-c-0.10.0.tar.gz [admin@192 src]$ cd rabbitmq-c-0.10.0 [admin@192 rabbitmq-c-0.10.0]$ cmake . -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c-0.10.0 [admin@192 rabbitmq-c-0.10.0]$ make # 非root这一步需要加sudo [admin@192 rabbitmq-c-0.10.0]$ sudo make install
安装AMQP
[admin@192 src]$ tar zxvf amqp-1.10.2.tgz [admin@192 src]$ cd amqp-1.10.2 # 生成编译文件 [admin@192 amqp-1.10.2]$ phpize Configuring for: PHP Api Version: 20180731 Zend Module Api No: 20180731 Zend Extension Api No: 320180731 # 查看本机php-config位置 [admin@192 amqp-1.10.2]$ sudo find / -name php-config /usr/local/php/bin/php-config # 配置 [admin@192 amqp-1.10.2]$ ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.10.0 # make [admin@192 amqp-1.10.2]$ make # make 报错 /usr/bin/ld: cannot find -lrabbitmq collect2: error: ld returned 1 exit status make: *** [amqp.la] Error 1 # 复制/usr/local/rabbitmq-c-0.10.0/lib64一份到/usr/local/rabbitmq-c-0.10.0/lib [admin@192 amqp-1.10.2]$ cd /usr/local/rabbitmq-c-0.10.0/ [admin@192 rabbitmq-c-0.10.0]$ ll total 0 drwxr-xr-x 2 root root 92 Dec 20 00:54 include drwxr-xr-x 3 root root 118 Dec 20 00:54 lib64 [admin@192 rabbitmq-c-0.10.0]$ sudo cp -R ./lib64 ./lib # 返回amqp-1.10.2的解压缩目录继续make [admin@192 amqp-1.10.2]$ make ---------------------------------------------------------------------- Libraries have been installed in: /usr/local/src/RabbitMQ/amqp-1.10.2/modules If you ever happen to want to link against installed libraries in a given directory, LIBDIR, you must either use libtool, and specify the full pathname of the library, or use the `-LLIBDIR' flag during linking and do at least one of the following: - add LIBDIR to the `LD_LIBRARY_PATH' environment variable during execution - add LIBDIR to the `LD_RUN_PATH' environment variable during linking - use the `-Wl,--rpath -Wl,LIBDIR' linker flag - have your system administrator add LIBDIR to `/etc/ld.so.conf' See any operating system documentation about shared libraries for more information, such as the ld(1) and ld.so(8) manual pages. ---------------------------------------------------------------------- Build complete. Don't forget to run 'make test'. # make install,非root这一步需要加sudo [admin@192 amqp-1.10.2]$ sudo make install Installing shared extensions: /usr/local/php/lib/php/extensions/no-debug-non-zts-20180731/ # 查看扩展 [admin@192 amqp-1.10.2]$ ll /usr/local/php/lib/php/extensions/no-debug-non-zts-20180731/ total 9672 -rwxr-xr-x 1 root root 656864 Dec 20 01:03 amqp.so # php.ini开启扩展 [admin@192 amqp-1.10.2]$ sudo find / -name php.ini /usr/local/php/etc/php.ini [admin@192 amqp-1.10.2]$ sudo vi /usr/local/php/etc/php.ini extension=amqp [admin@192 amqp-1.10.2]$ php -m [PHP Modules] amqp # 重启php-fpm [admin@192 amqp-1.10.2]$ sudo lnmp php-fpm restart
》简单PHP案例
说明
- 纯原声PHP代码实现方式,不借助三方包,只要PHP安装好amqp扩展即可。
代码组成
消费者代码:RabbitmqConsumer.php
<?php //配置信息 $conf = [ 'host' => '192.168.3.202', 'port' => '5672', 'login' => 'admin', 'password' => '123456', 'vhost' => '/' ]; // 交换机名字 $exchangeName = 'exchange001'; // 路由key名字 $routingKey = 'route001'; // 队列名字 $queueName001 = 'exchange001-queue001'; $queueName002 = 'exchange001-queue002'; // 创建connection $connection = new \AMQPConnection($conf); $connection->connect(); // 创建channel $channel = new \AMQPChannel($connection); // 初始化一个交换机并创建 $exchange = new \AMQPExchange($channel); $exchange->setName($exchangeName); // 设置交换机名字 $exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置类型:直接交换类型 $exchange->setFlags(AMQP_DURABLE); // 设置标识:持久交换机 $res = $exchange->declareExchange(); // 声明交换机 // 创建队列001 $queue001 = new \AMQPQueue($channel); $queue001->setName($queueName001); $queue001->setFlags(AMQP_DURABLE); $res = $queue001->declareQueue(); // 队列001绑定到交换机,并指定路由key $res = $queue001->bind($exchangeName, $routingKey); // 消费消息 echo "正在接收消息:\n"; // consume方法:是一个阻塞方法,有数据执行,没数据则一直阻塞状态 // $queue->consume('callbackFunction', AMQP_AUTOACK); // 自动应答 $queue001->consume('callbackFunction'); // get方法:是consume的非阻塞替代方法 // $msg = $queue->get(AMQP_AUTOACK); // 创建队列002 /** * 注意:由于consume方法是阻塞模式,所以之后的代码就不再执行了,故而队列002的代码不会执行 */ /* $queue002 = new \AMQPQueue($channel); $queue002->setName($queueName002); $queue002->setFlags(AMQP_DURABLE); $res = $queue002->declareQueue(); // 队列002绑定到交换机,并指定路由key $res = $queue002->bind($exchangeName, $routingKey); // 消费消息 $queue002->consume('callbackFunction'); */ // 消费消息回调方法 function callbackFunction(\AMQPEnvelope $envelope, \AMQPQueue $queue) { $msg = $envelope->getBody(); // 业务逻辑 echo '[' . date('YmdHis') . '] 队列:' . $queue->getName() . ' 成功处理了一条消息:' . $msg . "\n"; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }
生产者代码:RabbitmqProducer.php
<?php //配置信息 $conf = [ 'host' => '192.168.3.202', 'port' => '5672', 'login' => 'admin', 'password' => '123456', 'vhost' => '/' ]; // 交换机名字 $exchangeName = 'exchange001'; // 路由key名字 $routingKey = 'route001'; // 创建connection $connection = new \AMQPConnection($conf); $connection->connect(); // 创建channel $channel = new \AMQPChannel($connection); // 创建交换机 $exchange = new \AMQPExchange($channel); $exchange->setName($exchangeName); // 设置交换机名字 // 开启事务 //$channel->startTransaction(); // 发送消息 for ($i = 0; $i < 3; $i++) { $msg = 'msg' . $i; $exchange->publish($msg, $routingKey); echo "发送消息:[$msg] 成功\n"; sleep(1); } // 提交事务 //$channel->commitTransaction(); // 关闭connection $connection->disconnect(); echo "全部消息发送完成,程序退出。\n";
运行消费者
[admin@192 test]$ php RabbitmqConsumer.php 正在接收消息:
运行生产者
[admin@192 test]$ php RabbitmqProducer.php
》升级版PHP案例
说明
- 官方文档:https://www.rabbitmq.com/tutorials/tutorial-one-php.html
- 按照RabbitMQ官方文档,除了PHP安装amqp扩展外,通过composer依赖管理工具安装名为:“php-amqplib/php-amqplib”的依赖包。
代码组成
composer.phar
composer.json
"require": { "php-amqplib/php-amqplib": ">=3.0" }
消费者代码:Receive.php
<?php require_once __DIR__ . '/../../../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.3.202', 5672, 'admin', '123456'); $channel = $connection->channel(); // 声明一个交换机 $channel->exchange_declare('exchange01', AMQP_EX_TYPE_DIRECT, false, true, false); // 声明一个队列 $channel->queue_declare('queue01', false, true, false, true); $channel->queue_declare('queue02', false, true, false, true); // 绑定队列和交换机并指定路由key $channel->queue_bind('queue01', 'exchange01', 'routing01'); $channel->queue_bind('queue02', 'exchange01', 'routing01'); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function (\PhpAmqpLib\Message\AMQPMessage $msg) { echo ' [x] Received ', $msg->body, "\n"; }; $channel->basic_consume('queue01', '', false, true, false, false, $callback); $channel->basic_consume('queue02', '', false, true, false, false, $callback); while ($channel->is_open()) { $channel->wait(); }
生产者代码:Send.php
<?php require_once __DIR__ . '/../../../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.3.202', 5672, 'admin', '123456'); $channel = $connection->channel(); $channel->queue_declare('queue01', false, true, false, true); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, 'exchange01', 'routing01'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
运行消费者
[admin@192 Rabbit]$ php Receive.php [*] Waiting for messages. To exit press CTRL+C [x] Received Hello World!
运行生产者
[admin@192 Rabbit]$ php Send.php [x] Sent 'Hello World!'
备注
- 该教程部分内容收集自网络,感谢原作者。
附录
- 无