我创建了一个简单的发布者和使用者在队列上订阅的消费者basic.consume.
我的消费者在作业无异常运行时确认消息.每当我遇到异常时,我都不会收到消息并提前返回.只有已确认的消息才会从队列中消失,因此工作正常.
现在我希望消费者再次接收失败的消息,但重新生成这些消息的唯一方法是重新启动消费者.
我该如何处理这个用例?
设置代码
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');
Run Code Online (Sandbox Code Playgroud)
消费者代码
$queue->consume(array($this, 'callback'));
public function callback(AMQPEnvelope $msg)
{
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return;
}
return $queue->ack($msg->getDeliveryTag());
}
Run Code Online (Sandbox Code Playgroud)
制片人代码
$exchange->publish('message');
Run Code Online (Sandbox Code Playgroud)