使用RabbitMQ有一种方法可以使用它类似于MSSMQ,其中一个可以从队列中弹出1000条消息,然后插入数据库并从那里继续.
我似乎无法通过Subscription对一个频道进行预测,然后对Subscription中的BasicDeliveryEventArgs进行预测,并使用我想要在给定时间处理的最大消息数进行If语句.
提前谢谢这仍然从队列中获取所有22k消息
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare("****", true, false, false, null);
var subscription = new Subscription(channel, "****", false);
int maxMessages = 5;
int i = 0;
foreach (BasicDeliverEventArgs eventArgs in subscription)
{
if (++i == maxMessages)
{
Console.WriteLine("Took 5 messages");
subscription.Ack(eventArgs);
break;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
Mar*_*tos 11
我假设您希望优化将消息加载到数据库中,通过将这些消息组合成更大的事务而不是每个消息的成本.强制警告这样做意味着大量的消息可以一起失败,即使其中只有一个会导致问题,这就是你如何去做...
在频道上设置QOS:
channel.BasicQos(0, 1000, false);
Run Code Online (Sandbox Code Playgroud)
这将预先获取1000条消息并阻止进一步的流量,直到您确认某事为止.请注意,它不会以1000块为单位进行提取.而是确保在任何时候都预取最多1000条UNACK消息.模拟块传输就像首先处理1000个消息一样简单,然后一次性确认它们.
还有一点:即使您没有获得1000条消息的配额,您也可能希望在消息可用时立即刷新队列.您应该能够通过queue.BasicGet()在foreach循环内部调用直到它干涸,然后将您拥有的任何内容(包括您从中取出的消息subscription)发送到数据库来完成此操作.警告:我自己没试过,所以我可以说垃圾,但我认为它会起作用.这种方法的优点在于它可以立即将消息推送到数据库中,而无需等待整批1000条消息.如果数据库因处理太多小事务而落后,则预取积压将在每个周期之间填充更多.