Rabbit MQ unack消息不会返回队列以供消费者再次处理

the*_*itp 1 c# rabbitmq

我使用RabbitMQ作为我的队列消息服务器,我使用.NET C#client.当从队列处理消息时出现错误时,消息将不会被识别,并且仍然卡在队列中,而不是像我理解的文档那样再次处理.

我不知道我是否遗漏了一些配置或代码块.

我的想法现在是自动手动确认消息,如果错误和手动将此消息再次排队.

我希望有另一个更好的解决方案.

非常感谢.

我的代码

        public void Subscribe(string queueName)
    {
        while (!Cancelled)
        {
            try
            {
                if (subscription == null)
                {
                    try
                    {
                        //try to open connection
                        connection = connectionFactory.CreateConnection();
                    }
                    catch (BrokerUnreachableException ex)
                    {
                        //You probably want to log the error and cancel after N tries, 
                        //otherwise start the loop over to try to connect again after a second or so.
                        log.Error(ex);
                        continue;
                    }

                    //crate chanel
                    channel = connection.CreateModel();
                    // This instructs the channel not to prefetch more than one message
                    channel.BasicQos(0, 1, false);
                    // Create a new, durable exchange
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
                    // Create a new, durable queue
                    channel.QueueDeclare(queueName, true, false, false, null);
                    // Bind the queue to the exchange
                    channel.QueueBind(queueName, exchangeName, queueName);
                    //create subscription
                    subscription = new Subscription(channel, queueName, false);
                }

                BasicDeliverEventArgs eventArgs;
                var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
                if (gotMessage)
                {
                    if (eventArgs == null)
                    {
                        //This means the connection is closed.
                        DisposeAllConnectionObjects();
                        continue;//move to new iterate
                    }

                    //process message

                   channel.BasicAck(eventArgs.DeliveryTag, false);


                }
            }
            catch (OperationInterruptedException ex)
            {
                log.Error(ex);
                DisposeAllConnectionObjects();
            }
        }

        DisposeAllConnectionObjects();
    }

    private void DisposeAllConnectionObjects()
    {
        //dispose subscription
        if (subscription != null)
        {
            //IDisposable is implemented explicitly for some reason.
            ((IDisposable)subscription).Dispose();
            subscription = null;
        }

        //dipose channel
        if (channel != null)
        {
            channel.Dispose();
            channel = null;
        }

        //check if connection is not null and dispose it
        if (connection != null)
        {
            try
            {
                connection.Dispose();
            }
            catch (EndOfStreamException ex)
            {
                log.Error(ex);
            }
            catch (OperationInterruptedException ex)//handle this get error from dispose connection 
            {
                log.Error(ex);
            }
            catch (Exception ex)
            {
                log.Error(ex);
            }
            connection = null;
        }
    }
Run Code Online (Sandbox Code Playgroud)

Ash*_*Ash 9

我想你可能误解了Rabbitmq的文档.如果消息没有从消费者那里得到消息,那么Rabbit代理会将消息重新排队到队列中以供消费.我不相信你建议的ack'ing方法,然后重新排列一条消息是一个好主意,只会使问题更复杂.

如果您想明确"拒绝"消息,因为消费者在处理消息时遇到问题,那么您可以使用Rabbit的Nack功能.

例如,在catch异常块中,您可以使用:

subscription.Model.BasicNack(eventArgs.DeliveryTag, false, true);
Run Code Online (Sandbox Code Playgroud)

以上内容将通知Rabbit代理重新排列该消息.基本上你传递了传递标签,false表示它不是多条消息,而且确实是重新排队消息.如果要拒绝该消息而不重新排队,则将true更改为false.

此外,您已创建订阅,因此我认为您应该直接在此处执行您的ack.不是通过渠道.

更改:

channel.BasicAck(eventArgs.DeliveryTag, false);
Run Code Online (Sandbox Code Playgroud)

至:

subscription.Ack(); 
Run Code Online (Sandbox Code Playgroud)

这种ack'ing方法更加清晰,因为您可以在订阅对象上保留所有订阅相关内容,而不是弄乱您已订阅的频道.

如果这有帮助请将其标记为答案,很多人似乎忘记了这个社区的重要部分.