PHP AMQP的延迟队列的实现

Che*_*eng 16 php amqp rabbitmq

最近,我在生产者/消费者队列系统上做了快速实现.

<?php
namespace Queue;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;    

class Amqp
{
    private $connection;
    private $queueName;
    private $delayedQueueName;
    private $channel;
    private $callback;

    public function __construct($host, $port, $login, $password, $queueName)
    {
        $this->connection = new AMQPStreamConnection($host, $port, $login, $password);
        $this->queueName = $queueName;
        $this->delayedQueueName = null;
        $this->channel = $this->connection->channel();
        // First, we need to make sure that RabbitMQ will never lose our queue.
        // In order to do so, we need to declare it as durable. To do so we pass
        // the third parameter to queue_declare as true.
        $this->channel->queue_declare($queueName, false, true, false, false);
    }

    public function __destruct()
    {
        $this->close();
    }

    // Just in case : http://stackoverflow.com/questions/151660/can-i-trust-php-destruct-method-to-be-called
    // We should call close explicitly if possible.
    public function close()
    {
        if (!is_null($this->channel)) {
            $this->channel->close();
            $this->channel = null;
        }

        if (!is_null($this->connection)) {
            $this->connection->close();
            $this->connection = null;
        }
    }

    public function produceWithDelay($data, $delay)
    {
        if (is_null($this->delayedQueueName))
        {
            $delayedQueueName = $this->queueName . '.delayed';

            // First, we need to make sure that RabbitMQ will never lose our queue.
            // In order to do so, we need to declare it as durable. To do so we pass
            // the third parameter to queue_declare as true.
            $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
                new AMQPTable(array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $this->queueName
                ))
            );

            $this->delayedQueueName = $delayedQueueName;
        }

        $msg = new AMQPMessage(
            $data,
            array(
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => $delay
            )
        );

        $this->channel->basic_publish($msg, '', $this->delayedQueueName);
    }

    public function produce($data)
    {
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function consume($callback)
    {
        $this->callback = $callback;

        // This tells RabbitMQ not to give more than one message to a worker at
        // a time.
        $this->channel->basic_qos(null, 1, null);

        // Requires ack.
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));

        while(count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function consumeCallback($msg)
    {
        call_user_func_array(
            $this->callback,
            array($msg)
        );

        // Very important to ack, in order to remove msg from queue. Ack after
        // callback, as exception might happen in callback.
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }

    public function getQueueSize()
    {
        // three tuple containing (<queue name>, <message count>, <consumer count>)
        $tuple = $this->channel->queue_declare($this->queueName, false, true, false, false);
        if ($tuple != null && isset($tuple[1])) {
            return $tuple[1];
        }
        return -1;
    }
}
Run Code Online (Sandbox Code Playgroud)

public function producepublic function consume对按预期工作.

但是,当它带有延迟队列系统时

public function produceWithDelay并且public function consume对不能按预期工作.呼叫consume,不能接收任何物品,甚至等待一段时间的消费者.

我相信我的produceWithDelay实施不对.我可以知道这有什么不对吗?

Che*_*eng 1

作为旁注。

我发现这是我自己的bug造成的。

代替

    if (is_null($this->delayedQueueName))
    {
        $delayedQueueName = $this->queueName . '.delayed';

        $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
        ...

        $this->delayedQueueName = $delayedQueueName;
    }
Run Code Online (Sandbox Code Playgroud)

我应该把它写在

    if (is_null($this->delayedQueueName))
    {
        $delayedQueueName = $this->queueName . '.delayed';

        $this->channel->queue_declare(delayedQueueName, false, true, false, false, false,
        ...

        $this->delayedQueueName = $delayedQueueName;
    }
Run Code Online (Sandbox Code Playgroud)

我的成员变量尚未正确初始化。

完整可行的代码如下,供您参考。

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class Amqp
{
    private $connection;
    private $queueName;
    private $delayedQueueName;
    private $channel;
    private $callback;

    public function __construct($host, $port, $login, $password, $queueName)
    {
        $this->connection = new AMQPStreamConnection($host, $port, $login, $password);
        $this->queueName = $queueName;
        $this->delayedQueueName = null;
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare($queueName, false, true, false, false);
    }

    public function __destruct()
    {
        $this->close();
    }

    public function close()
    {
        if (!is_null($this->channel)) {
            $this->channel->close();
            $this->channel = null;
        }

        if (!is_null($this->connection)) {
            $this->connection->close();
            $this->connection = null;
        }
    }

    public function produceWithDelay($data, $delay)
    {
        if (is_null($this->delayedQueueName))
        {
            $delayedQueueName = $this->queueName . '.delayed';

            $this->channel->queue_declare($delayedQueueName, false, true, false, false, false,
                new AMQPTable(array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $this->queueName
                ))
            );

            $this->delayedQueueName = $delayedQueueName;
        }

        $msg = new AMQPMessage(
            $data,
            array(
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => $delay
            )
        );

        $this->channel->basic_publish($msg, '', $this->delayedQueueName);
    }

    public function produce($data)
    {
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function consume($callback)
    {
        $this->callback = $callback;

        $this->channel->basic_qos(null, 1, null);

        $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback'));

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function callback($msg)
    {
        call_user_func_array(
            $this->callback,
            array($msg)
        );

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
}
Run Code Online (Sandbox Code Playgroud)