简介:PHP实现rabbitmq的消费端和生成端具体代码
在之前的文章中我们已经学习了【RabbitMQ的安装】、【rabbitmq相关命令】、【PHP扩展amqp安装】。但是如何通过代码去实现队列的消费端和生成端呢?
接下来我们主要介绍PHP使用amqp扩展实现队列的消费端和生成端。
消费端
<?php
/**
* 队列消费者
*/
class RabbitConsumer {
//rabbitmq相关配置
private $config;
//交换机名
protected $exchange_name;
//队列名
protected $queue_name;
//路由key
protected $routing_key;
/**
* @var AMQPConnection $conn 链接对象
*/
protected $conn;
//通信通道
protected $channel;
//交换机对象
protected $exchange;
/**
* @var AMQPQueue $queue 队列对象
*/
protected $queue;
public function __construct()
{
$this->config = array(
'host'=>'127.0.0.1', //rabbitmq服务端IP地址
'port'=>5672, //rabbitmq服务端端口
'login'=>'guest', //登录账号
'password'=>'guest', //登录密码
'vhost'=>'/', //rabbitmq服务的虚拟机
'connect_timeout'=>30 //连接超时时间,单位:秒
);
$this->exchange_name = 'amq.direct';
$this->queue_name = 'test';
$this->routing_key = 'test';
$this->connect();
$this->getExChannel();
$this->getQueue();
}
//连接队列服务器
public function connect()
{
if(!$this->conn) {
$this->conn = new \AMQPConnection($this->config);
if(!$this->conn->connect()) {
throw new Exception('rabbitmq服务连接失败');
}
}
return $this->conn;
}
//交换机
public function getExChannel()
{
if (!$this->exchange){
//创建channel
$this->channel = new AMQPChannel($this->connect());
//创建交换机
$this->exchange = new AMQPExchange($this->channel);
//设置交换机名
$this->exchange->setName($this->exchange_name);
}
return $this->exchange;
}
//获取队列
public function getQueue()
{
if(!$this->queue){
//创建队列
$this->queue = new AMQPQueue($this->channel);
//设置队列名
$this->queue->setName($this->queue_name);
//注意如果已经提前创建了队列可以忽略下面这三步。重复创建也不会报错,rabbitmq会自动忽略
//设置持久化
$this->queue->setFlags(AMQP_DURABLE);
//创建队列,如果队列已存在不会重复创建,自动忽略
$this->queue->declareQueue();
//队列绑定交换机并设置路由key (如果已经绑定,rabbitmq会自动忽略,不会重复绑定)
if(!$this->queue->bind($this->exchange_name,$this->routing_key)){
throw new Exception('队列绑定路由失败!');
}
}
return $this->queue;
}
/**
* 消费队列
*/
public function consumeQueue()
{
while (true){
//获取队列消息,通过本类的logic方法处理实际的业务逻辑
$this->queue->consume([$this,'logic']);
}
}
/**
* 业务处理逻辑
* @param $resObj
* @param $queue
*/
protected function logic($resObj,$queue)
{
//消费队列实际业务逻辑
var_dump(json_decode($resObj->getBody(),true));
echo PHP_EOL;
//手动发送ACK应答,如果设置了consume第二个参数为【AMQP_AUTOACK】自动应答,请忽略
//实际应用中建议大家还是手动ACK应答,如果业务处理异常时可能需要重新处理一遍
$queue->ack($resObj->getDeliveryTag());
}
//对象释放
public function __destruct()
{
$this->conn->disconnect();
}
}
//实例化消费端对象
$consumer = new RabbitConsumer();
//消费队列,处理实际的业务逻辑
$consumer->consumeQueue();
上述代码实现后我们可以通过cli模式执行PHP脚本,命令格式如下:php RabbitConsumer.php
管理消费端进程
cli模式启动的脚本无法常驻内存中,或者说无法以守护进程的模式在后台启动。此外如果因为一些特殊原因,导致脚本进程被杀死怎么办呢?
关于上面的问题,可以利用之前给大家讲解的【supervisor进程管理器】来启动消费端的脚本。supervisor是python进程管理器,它可以使cli模式执行的进程在后台运行,还具备进程异常退出后自动启动的能力。具体操作大家可以看看我之前的supervisor相关介绍,我这里给出rabbitmq启动脚本的配置文件信息:
[program:rabbit_consumer]
directory=/data/www/rabbitmq_test/
command=php rabbit_consumer.php
user=root
autostart=false
autorestart=true
startsecs=1
startretries=3
stderr_logfile=/data/www/rabbitmq_test/rabbitmq_consumer_err.log
stdout_logfile=/data/www/rabbitmq_test/rabbitmq_consumer_out.log
配置完成后记得使用【systemctl restart supervisord】重启supervisor使其生效。
supervisor启动消费端:supervisorctl start rabbit_consumer
生产端
<?php
/**
* 队列生成者
*/
class RabbitPublisher
{
//rabbitmq相关配置
private $config;
//交换机名
protected $exchange_name;
//路由key
protected $routing_key;
/**
* @var AMQPConnection $conn 链接对象
*/
protected $conn;
//通信通道
protected $channel;
/**
* @var AMQPExchange $exchange 交换机对象
*/
protected $exchange;
public function __construct()
{
$this->config = array(
'host' => '127.0.0.1', //rabbitmq服务端IP地址
'port' => 5672, //rabbitmq服务端端口
'login' => 'guest', //登录账号
'password' => 'guest', //登录密码
'vhost' => '/', //rabbitmq服务的虚拟机
'write_timeout'=>30, //写入超时时间,单位:秒
'connect_timeout' => 30 //连接超时时间,单位:秒
);
//如果交换机的名称为空字符串,会使用默认的交换机上
$this->exchange_name = 'amq.direct';
$this->routing_key = 'test';
}
//连接队列服务器
public function connect()
{
if (!$this->conn) {
$this->conn = new \AMQPConnection($this->config);
if (!$this->conn->connect()) {
throw new Exception('rabbitmq服务连接失败');
}
}
return $this->conn;
}
//交换机
public function getExChannel()
{
if (!$this->exchange) {
//创建channel
$this->channel = new AMQPChannel($this->connect());
//创建交换机对象
$this->exchange = new AMQPExchange($this->channel);
//设置交换机名
$this->exchange->setName($this->exchange_name);
}
return $this->exchange;
}
/**
* 发送消息
*/
public function publish($msg_str)
{
if(is_array($msg_str)) $msg_str = json_encode($msg_str,JSON_UNESCAPED_UNICODE);
$this->getExChannel()->publish($msg_str,$this->routing_key);
}
//释放rabbmq连接
public function __destruct()
{
$this->conn->disconnect();
}
}
$publisher = new RabbitPublisher();
//发送消息到队列中
$msg = ['id'=>'自定义ID,例如:订单id','a'=>'其他内容字段'];
$publisher->publish($msg);
生产端代码也很简单,就是连接rabbitmq之后发送一个消息即可。
注意了,上述代码中我是用的交换机是直连的方式【amq.direct】,这个交换机是系统默认的。这个需要根据我们自己的需求进行修改。
路由key是队列用于匹配的规则,队列会根据自己的路由key规则获取相关消息列表。其实就是告诉交换机当前消息应该发送给哪些队列的作用。
获取队列中待处理的消息数
有的时候需要监听待处理的消息数,以方便优化队列服务。我们可以通过 declareQueue 声明队列方法获取队列待处理消息数。具体代码如下:
<?php
$config = array(
'host'=>'127.0.0.1', //rabbitmq服务端IP地址
'port'=>5672, //rabbitmq服务端端口
'login'=>'guest', //登录账号
'password'=>'guest', //登录密码
'vhost'=>'/', //rabbitmq服务的虚拟机
'connect_timeout'=>30 //连接超时时间,单位:秒
);
//连接rabbitmq
$connect = new \AMQPConnection($config);
if (!$connect->connect()) exit('连接失败');
//创建信通
$channel = new \AMQPChannel($connect);
$queue = new \AMQPQueue($channel);
$queue->setName('test');
$queue->setFlags(AMQP_DURABLE);
$queue->setArgument('x-max-priority',10);
$count = $queue->declareQueue();
echo '待处理消息数:'.$count;
注意:declareQueue声明队列时返回的是队列中待处理消息的数量。此外,队列的参数必须和首次声明时的参数一样,不可缺少,否则会报错
有遗漏或者不对的可以在我的公众号留言哦