使用 PhpAmqpLib 从rabbitmq消费者的回调中获取队列大小

13D*_*GeR 4 php queue rabbitmq php-amqplib

我想记录工作人员回调的工作状态,并在左侧队列中包含一些消息。

到目前为止,我找到的唯一解决方案是获取queue_declare结果数组的第二个成员,但这应该在每个工作进程启动时调用一次,并且我需要在每条新消息中更新信息。

UPD:基于IMSoP 答案的解决方案:

<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('test1');
echo "[*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) use ($channel) {
    list (, $cn) = $channel->queue_declare('test1', true);
    echo ' [x] Received ', $msg->body, " $cn left";
    for ($i = 0; $i < $msg->body; ++$i) {
        sleep(1);
        echo '.';
    }
    echo "\n";
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('test1', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}
Run Code Online (Sandbox Code Playgroud)

由于某种原因,消息计数始终为 0。

IMS*_*SoP 6

queue_declare方法有一个名为“passive”的参数,可用于此目的:它仅通过名称检查队列是否存在,并忽略任何其他参数。

根据AMQP 文档

如果设置,如果队列已存在同名,服务器将回复 Declare-Ok,如果不存在,则引发错误。客户端可以使用它来检查队列是否存在,而无需修改服务器状态。设置后,除 name 和 no-wait 之外的所有其他方法字段都将被忽略。同时带有被动和不等待的声明没有效果。比较参数的语义等价性。

请注意,这Declare-Ok不仅仅是一个状态,而是完整响应结构的名称,其中包含字段queuemessage-countconsumer-count

在 PHP-AMQPLib 中,您可以使用它来记录一组队列的状态,如下所示:

foreach ( $this->registeredQueues as $queueName ) {
    // The second parameter to queue_declare is $passive
    // When set to true, everything else is ignored, so need not be passed
    list($queueName, $messageCount, $consumerCount)
        = $this->rabbitChannel->queue_declare($queueName, true);

    $this->logger->info(
        "Queue $queueName has $messageCount messages and $consumerCount active consumers."
    );
}
Run Code Online (Sandbox Code Playgroud)

  • 由于某种原因,消息计数始终为 0。我用代码示例更新了问题。 (2认同)