PHP实现消费端及生产端具体代码

简介:PHP实现rabbitmq的消费端和生成端具体代码

在之前的文章中我们已经学习了【RabbitMQ的安装】、【rabbitmq相关命令】、【PHP扩展amqp安装】。但是如何通过代码去实现队列的消费端和生成端呢?

接下来我们主要介绍PHP使用amqp扩展实现队列的消费端和生成端。

消费端

<?php
/**
 * 队列消费者
 */
class RabbitConsumer {
    //rabbitmq相关配置
    private $config;

    //交换机名
    protected $exchange_name;

    //队列名
    protected $queue_name;

    //路由key
    protected $routing_key;

    /**
     * @var AMQPConnection $conn 链接对象
     */
    protected $conn;

    //通信通道
    protected $channel;

    //交换机对象
    protected $exchange;

    /**
     * @var AMQPQueue $queue 队列对象
     */
    protected $queue;

    public function __construct()
    {
        $this->config = array(
            'host'=>'127.0.0.1', //rabbitmq服务端IP地址
            'port'=>5672, //rabbitmq服务端端口
            'login'=>'guest', //登录账号
            'password'=>'guest', //登录密码
            'vhost'=>'/', //rabbitmq服务的虚拟机
            'connect_timeout'=>30 //连接超时时间,单位:秒
        );
        $this->exchange_name = 'amq.direct';
        $this->queue_name = 'test';
        $this->routing_key = 'test';
        $this->connect();
        $this->getExChannel();
        $this->getQueue();
    }

    //连接队列服务器
    public function connect()
    {
        if(!$this->conn) {
            $this->conn = new \AMQPConnection($this->config);
            if(!$this->conn->connect()) {
                throw new Exception('rabbitmq服务连接失败');
            }
        }
        return $this->conn;
    }

    //交换机
    public function getExChannel()
    {
        if (!$this->exchange){
            //创建channel
            $this->channel = new AMQPChannel($this->connect());
            //创建交换机
            $this->exchange = new AMQPExchange($this->channel);

            //设置交换机名
            $this->exchange->setName($this->exchange_name);
        }
        return $this->exchange;
    }

    //获取队列
    public function getQueue()
    {
        if(!$this->queue){
            //创建队列
            $this->queue = new AMQPQueue($this->channel);
            //设置队列名
            $this->queue->setName($this->queue_name);
            //注意如果已经提前创建了队列可以忽略下面这三步。重复创建也不会报错,rabbitmq会自动忽略

            //设置持久化
            $this->queue->setFlags(AMQP_DURABLE);
            //创建队列,如果队列已存在不会重复创建,自动忽略
            $this->queue->declareQueue();
            //队列绑定交换机并设置路由key (如果已经绑定,rabbitmq会自动忽略,不会重复绑定)
            if(!$this->queue->bind($this->exchange_name,$this->routing_key)){
                throw new Exception('队列绑定路由失败!');
            }
        }
        return $this->queue;
    }

    /**
     * 消费队列
     */
    public function consumeQueue()
    {
        while (true){
            //获取队列消息,通过本类的logic方法处理实际的业务逻辑
            $this->queue->consume([$this,'logic']);
        }
    }

    /**
     * 业务处理逻辑
     * @param $resObj
     * @param $queue
     */
    protected function logic($resObj,$queue)
    {
        //消费队列实际业务逻辑
        var_dump(json_decode($resObj->getBody(),true));
        echo PHP_EOL;
        //手动发送ACK应答,如果设置了consume第二个参数为【AMQP_AUTOACK】自动应答,请忽略
        //实际应用中建议大家还是手动ACK应答,如果业务处理异常时可能需要重新处理一遍
        $queue->ack($resObj->getDeliveryTag());
    }

    //对象释放
    public function __destruct()
    {
        $this->conn->disconnect();
    }
}
//实例化消费端对象
$consumer = new RabbitConsumer();
//消费队列,处理实际的业务逻辑
$consumer->consumeQueue();

上述代码实现后我们可以通过cli模式执行PHP脚本,命令格式如下:php RabbitConsumer.php

管理消费端进程

cli模式启动的脚本无法常驻内存中,或者说无法以守护进程的模式在后台启动。此外如果因为一些特殊原因,导致脚本进程被杀死怎么办呢?

关于上面的问题,可以利用之前给大家讲解的【supervisor进程管理器】来启动消费端的脚本。supervisor是python进程管理器,它可以使cli模式执行的进程在后台运行,还具备进程异常退出后自动启动的能力。具体操作大家可以看看我之前的supervisor相关介绍,我这里给出rabbitmq启动脚本的配置文件信息:

[program:rabbit_consumer]
directory=/data/www/rabbitmq_test/
command=php rabbit_consumer.php
user=root
autostart=false
autorestart=true
startsecs=1
startretries=3
stderr_logfile=/data/www/rabbitmq_test/rabbitmq_consumer_err.log
stdout_logfile=/data/www/rabbitmq_test/rabbitmq_consumer_out.log

配置完成后记得使用【systemctl restart supervisord】重启supervisor使其生效。

supervisor启动消费端:supervisorctl start rabbit_consumer

 

生产端

<?php

/**
 * 队列生成者
 */
class RabbitPublisher
{
    //rabbitmq相关配置
    private $config;

    //交换机名
    protected $exchange_name;

    //路由key
    protected $routing_key;

    /**
     * @var AMQPConnection $conn 链接对象
     */
    protected $conn;

    //通信通道
    protected $channel;

    /**
     * @var AMQPExchange $exchange 交换机对象
     */
    protected $exchange;

    public function __construct()
    {
        $this->config = array(
            'host' => '127.0.0.1', //rabbitmq服务端IP地址
            'port' => 5672, //rabbitmq服务端端口
            'login' => 'guest', //登录账号
            'password' => 'guest', //登录密码
            'vhost' => '/', //rabbitmq服务的虚拟机
            'write_timeout'=>30, //写入超时时间,单位:秒
            'connect_timeout' => 30 //连接超时时间,单位:秒
        );
        //如果交换机的名称为空字符串,会使用默认的交换机上
        $this->exchange_name = 'amq.direct';
        $this->routing_key = 'test';
    }

    //连接队列服务器
    public function connect()
    {
        if (!$this->conn) {
            $this->conn = new \AMQPConnection($this->config);
            if (!$this->conn->connect()) {
                throw new Exception('rabbitmq服务连接失败');
            }
        }
        return $this->conn;
    }

    //交换机
    public function getExChannel()
    {
        if (!$this->exchange) {
            //创建channel
            $this->channel = new AMQPChannel($this->connect());

            //创建交换机对象
            $this->exchange = new AMQPExchange($this->channel);

            //设置交换机名
            $this->exchange->setName($this->exchange_name);
        }
        return $this->exchange;
    }

    /**
     * 发送消息
     */
    public function publish($msg_str)
    {
        if(is_array($msg_str)) $msg_str = json_encode($msg_str,JSON_UNESCAPED_UNICODE);
        $this->getExChannel()->publish($msg_str,$this->routing_key);
    }

    //释放rabbmq连接
    public function __destruct()
    {
        $this->conn->disconnect();
    }
}

$publisher = new RabbitPublisher();
//发送消息到队列中
$msg = ['id'=>'自定义ID,例如:订单id','a'=>'其他内容字段'];
$publisher->publish($msg);

生产端代码也很简单,就是连接rabbitmq之后发送一个消息即可。

注意了,上述代码中我是用的交换机是直连的方式【amq.direct】,这个交换机是系统默认的。这个需要根据我们自己的需求进行修改。

路由key是队列用于匹配的规则,队列会根据自己的路由key规则获取相关消息列表。其实就是告诉交换机当前消息应该发送给哪些队列的作用。

获取队列中待处理的消息数

有的时候需要监听待处理的消息数,以方便优化队列服务。我们可以通过 declareQueue 声明队列方法获取队列待处理消息数。具体代码如下:

<?php
$config = array(
    'host'=>'127.0.0.1', //rabbitmq服务端IP地址
    'port'=>5672, //rabbitmq服务端端口
    'login'=>'guest', //登录账号
    'password'=>'guest', //登录密码
    'vhost'=>'/', //rabbitmq服务的虚拟机
    'connect_timeout'=>30 //连接超时时间,单位:秒
);
//连接rabbitmq
$connect = new \AMQPConnection($config);
if (!$connect->connect()) exit('连接失败');

//创建信通
$channel = new \AMQPChannel($connect);

$queue = new \AMQPQueue($channel);
$queue->setName('test');
$queue->setFlags(AMQP_DURABLE);
$queue->setArgument('x-max-priority',10);
$count = $queue->declareQueue();
echo '待处理消息数:'.$count;

注意:declareQueue声明队列时返回的是队列中待处理消息的数量。此外,队列的参数必须和首次声明时的参数一样,不可缺少,否则会报错

有遗漏或者不对的可以在我的公众号留言哦

编程经验共享公众号二维码

编程经验共享公众号二维码
更多内容关注公众号
Copyright © 2021 编程经验共享 赣ICP备2021010401号-1