C#RabbitMQ等待指定超时的一条消息?

eoc*_*ron 11 c# rabbitmq

RabbitMQ中的解决方案等待带有超时的消息等待带有超时的单个RabbitMQ消息似乎不起作用,因为官方C#库中没有下一个交付方法并且QueueingBasicConsumer被删除,因此它只会在任何地方抛出NotSupportedException.

如何从队列中等待指定超时的单个消息?

PS

它可以通过Basic.Get()来完成,是的,但是,在指定的时间间隔(超额流量,多余的CPU)中拉取消息是不好的解决方案.

更新

EventingBasicConsumer by implmenetation不支持立即取消.即使您在某个时刻调用BasicCancel,即使您通过BasicQos指定预取- 它仍将在帧中获取,并且这些帧可以包含多个消息.因此,单个任务执行并不好.不要打扰 - 它只是不适用于单个消息.

Evk*_*Evk 8

有很多方法可以做到这一点.例如,你可以使用EventingBasicConsumer一起ManualResetEvent,像这样的(这只是用于演示目的-更好地利用以下方法之一):

var factory = new ConnectionFactory();
using (var connection = factory.CreateConnection()) {
    using (var channel = connection.CreateModel()) {
        // setup signal
        using (var signal = new ManualResetEvent(false)) {
            var consumer = new EventingBasicConsumer(channel);
            byte[] messageBody = null;                        
            consumer.Received += (sender, args) => {
                messageBody = args.Body;
                // process your message or store for later
                // set signal
                signal.Set();
            };               
            // start consuming
            channel.BasicConsume("your.queue", false, consumer);
            // wait until message is received or timeout reached
            bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
            // cancel subscription
            channel.BasicCancel(consumer.ConsumerTag);
            if (timeout) {
                // timeout reached - do what you need in this case
                throw new Exception("timeout");
            }

            // at this point messageBody is received
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

正如您在评论中所述 - 如果您希望同一队列中有多条消息,那么这不是最佳方式.嗯,这不是最好的方式,无论如何,我把它包括在内,以证明使用ManualResetEventin case库本身不提供超时支持.

如果您正在执行RPC(远程过程调用,请求 - 回复) - 您可以在服务器端SimpleRpcClient一起使用SimpleRpcServer.客户端将如下所示:

var client = new SimpleRpcClient(channel, "your.queue");
client.TimeoutMilliseconds = 10 * 1000;
client.TimedOut += (sender, args) => {
    // do something on timeout
};                    
var reply = client.Call(myMessage); // will return reply or null if timeout reached
Run Code Online (Sandbox Code Playgroud)

更简单的方法:使用基本Subscription类(它在EventingBasicConsumer内部使用相同的,但支持超时,因此您不需要自己实现),如下所示:

var sub = new Subscription(channel, "your.queue");
BasicDeliverEventArgs reply;
if (!sub.Next(10 * 1000, out reply)) {
     // timeout
}
Run Code Online (Sandbox Code Playgroud)