消费者不承认来自RabbitMq的消息

Bra*_*sen 14 php amqp rabbitmq

我创建了一个简单的发布者和使用者在队列上订阅的消费者basic.consume.

我的消费者在作业无异常运行时确认消息.每当我遇到异常时,我都不会收到消息并提前返回.只有已确认的消息才会从队列中消失,因此工作正常.
现在我希望消费者再次接收失败的消息,但重新生成这些消息的唯一方法是重新启动消费者.

我该如何处理这个用例?

设置代码

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');
Run Code Online (Sandbox Code Playgroud)

消费者代码

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}
Run Code Online (Sandbox Code Playgroud)

制片人代码

$exchange->publish('message');
Run Code Online (Sandbox Code Playgroud)

pin*_*ain 20

如果消息未被确认且应用程序失败,它将自动重新redelivered传送,并且信封上的属性将被设置为true(除非您使用no-ack = true标志消费它们).

UPD:

您必须nack在catch块中使用redelivery标志发送消息

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }
Run Code Online (Sandbox Code Playgroud)

谨防无限次消息,而重新传递计数在RabbitMQ和AMQP协议中根本没有实现.

如果你不想搞这些消息并且只是想添加一些延迟,你可能想要添加一些sleep()usleep()之前的nack方法调用,但它根本不是一个好主意.

有多种技术可以处理循环重新传递问题:

1.依靠死信交换

  • 优点:可靠,标准,清晰
  • 缺点:需要额外的逻辑

2.使用每条消息或每个队列TTL

  • 优点:易于实施,也标准,清晰
  • 缺点:排长队你可能会失去一些信息

示例(注意,对于队列ttl,我们只传递数字和消息ttl - 任何将是数字字符串的东西):

2.1每条消息ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);
Run Code Online (Sandbox Code Playgroud)

2.2.每队列ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));
Run Code Online (Sandbox Code Playgroud)

3.在邮件正文或标题中保留redelivers计数或保留redelivers编号(即IP堆栈中的跳跃限制或ttl)

  • 专业人士:在应用程序级别为您提供额外的消息生存期控制
  • 缺点:重要的开销,而你必须修改消息并再次发布,特定于应用程序,不清楚

码:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);
Run Code Online (Sandbox Code Playgroud)

可能有一些其他方法可以更好地控制消息redelivers流.

结论:没有银弹解决方案.您必须确定最适合您需求的解决方案或找到其他解决方案,但不要忘记在此处分享;)