在异常时重新排队消息

Snæ*_*ørn 5 c# publish-subscribe rabbitmq easynetq

我正在寻找一种可靠的方法来重新排队无法正确处理的消息 - 目前.

我一直在关注http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/,似乎支持在RabbitMQ中重新排队消息API.

else //reject the message but push back to queue for later re-try
{
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
    model.BasicReject(deliveryArguments.DeliveryTag, true);
}
Run Code Online (Sandbox Code Playgroud)

但是我正在使用EasyNetQ.所以想知道我将如何做类似的事情.

bus.Subscribe<MyMessage>("my_subscription_id", msg => {
    try
    {
        // do work... could be long running
    }
    catch ()
    {
        // something went wrong - requeue message
    }
});
Run Code Online (Sandbox Code Playgroud)

这甚至是一个好方法吗?不ACK,如果该消息可能导致的问题do work超过了等待ACK由RabbitMQ的服务器超时.

Snæ*_*ørn 7

所以我提出了这个解决方案.这取代了EasyNetQ的默认错误策略.

public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
    public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
    : base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
    {
    }

    public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
    {
        object deathHeaderObject;
        if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
            return AckStrategies.NackWithoutRequeue;

        var deathHeaders = deathHeaderObject as IList;

        if (deathHeaders == null)
            return AckStrategies.NackWithoutRequeue;

        var retries = 0;
        foreach (IDictionary header in deathHeaders)
        {
            var count = int.Parse(header["count"].ToString());
            retries += count;
        }

        if (retries < 3)
            return AckStrategies.NackWithoutRequeue;
        return base.HandleConsumerError(context, exception);
    }
}
Run Code Online (Sandbox Code Playgroud)

你这样替换它:

RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())
Run Code Online (Sandbox Code Playgroud)

你必须使用它,AdvancedBus所以你必须手动设置一切.

using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
    var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
    var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
    var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
    bus.Advanced.Bind(deadExchange, queue, "");
    bus.Advanced.Bind(textExchange, queue, "");

    bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}
Run Code Online (Sandbox Code Playgroud)

这将死信3次失败.之后,它将转到EasyNetQ提供的默认错误队列以进行错误处理.您可以订阅该队列.

当异常传播出您的使用者方法时,消息将被删除.所以这会引发一封死信.

static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
    throw new Exception("This is a test!");
}
Run Code Online (Sandbox Code Playgroud)

  • 查看实现DeadLetterStrategy的实现,将消息重新提交到队列之间没有延迟。我希望在下一次重新提交之间有一些运行延迟,例如2 * t秒延迟(通过乘以2等因素来延迟时间的常见策略)。没有Thread.Sleep可以完成此操作吗? (2认同)