等待单个RabbitMQ消息超时

EMP*_*EMP 7 .net python amqp rabbitmq py-amqplib

我想向RabbitMQ服务器发送消息,然后等待回复消息(在"回复"队列上).当然,我不想永远等待处理这些消息的应用程序关闭 - 需要超时.这听起来像是一项非常基本的任务,但我找不到办法做到这一点.我现在用py-amqplibRabbitMQ .NET客户端遇到了这个问题.

到目前为止,我已经得到了最好的解决方案是使用轮询basic_getsleep在两者之间,但是这是很丑陋:

def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)
Run Code Online (Sandbox Code Playgroud)

当然还有更好的方法吗?

EMP*_*EMP 8

这是我最终在.NET客户端中所做的事情:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
    var consumer = new QueueingBasicConsumer(Channel);
    var tag = Channel.BasicConsume(queueName, true, null, consumer);
    try
    {
        object result;
        if (!consumer.Queue.Dequeue(timeoutMs, out result))
            throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));

        return ((BasicDeliverEventArgs)result).Body;
    }
    finally
    {
        Channel.BasicCancel(tag);
    }
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,我不能对py-amqplib做同样的事情,因为basic_consume除非你调用channel.wait()并且channel.wait()不支持超时,否则它的方法不会调用回调!这个愚蠢的限制(我一直在遇到)意味着如果你从未收到另一条消息,你的线程将永远冻结.


ask*_*sol 8

我只是增加了超时的支持amqplibcarrot.

这是以下的子类amqplib.client0_8.Connection:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multi是一个channel.wait能够在任意数量的频道上接收的版本.

我想这可能会在某些时候合并到上游.