RabbitMQ和C#

use*_*237 4 c# rabbitmq

使用RabbitMQ有一种方法可以使用它类似于MSSMQ,其中一个可以从队列中弹出1000条消息,然后插入数据库并从那里继续.

我似乎无法通过Subscription对一个频道进行预测,然后对Subscription中的BasicDeliveryEventArgs进行预测,并使用我想要在给定时间处理的最大消息数进行If语句.

提前谢谢这仍然从队列中获取所有22k消息

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        channel.QueueDeclare("****", true, false, false, null);

        var subscription = new Subscription(channel, "****", false);
        int maxMessages = 5;
        int i = 0;
        foreach (BasicDeliverEventArgs eventArgs in subscription)
        {
            if (++i == maxMessages)
            {
                Console.WriteLine("Took 5 messages");
                subscription.Ack(eventArgs);
                break;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Mar*_*tos 11

我假设您希望优化将消息加载到数据库中,通过将这些消息组合成更大的事务而不是每个消息的成本.强制警告这样做意味着大量的消息可以一起失败,即使其中只有一个会导致问题,这就是你如何去做...

在频道上设置QOS:

channel.BasicQos(0, 1000, false);
Run Code Online (Sandbox Code Playgroud)

这将预先获取1000条消息并阻止进一步的流量,直到您确认某事为止.请注意,它不会以1000块为单位进行提取.而是确保在任何时候都预取最多1000条UNACK消息.模拟块传输就像首先处理1000个消息一样简单,然后一次性确认它们.

请参阅此处此处获取比我更具权威性的解释.

还有一点:即使您没有获得1000条消息的配额,您也可能希望在消息可用时立即刷新队列.您应该能够通过queue.BasicGet()foreach循环内部调用直到它干涸,然后将您拥有的任何内容(包括您从中取出的消息subscription)发送到数据库来完成此操作.警告:我自己没试过,所以我可以说垃圾,但我认为它会起作用.这种方法的优点在于它可以立即将消息推送到数据库中,而无需等待整批1000条消息.如果数据库因处理太多小事务而落后,则预取积压将在每个周期之间填充更多.