如何并行处理MSMQ消息

Gab*_*art 16 parallel-processing msmq .net-4.0 task-parallel-library

我正在编写一个Windows服务来使用MSMQ消息.该服务将具有高活动时段(80k消息很快进入)和长时间不活动(可能是几天没有新消息).

处理消息是非常网络限制的,所以我从并行性中获得了很大的好处.但是在不活动期间,我不希望绑定一堆线程来等待很快就会收到的消息.

MSMQ界面似乎非常关注同步工作流 - 获取一条消息,处理它,获取另一条消息等等.我应该如何构建我的代码,以便在高活动期间我可以利用并行性但不会占用一堆没有活动期间的线程?使用TPL的奖励积分.伪代码将不胜感激.

Bon*_*ver 11

多年来我已经做了很多MSMQ(包括移动实现),你在"同步工作流程"的表征方面是正确的.这并不是说你不能通过TPL将各种消息包络并通过不同的内核处理它们......限制因素是读/写队列......本质上是一个串行操作.例如,您不能一次发送8条消息(具有8个核心的计算机).

我有一个类似的需求(没有使用System.Messaging命名空间)并在我阅读Campbell和Johnson阅读的"与Microsoft.NET并行编程"一书的帮助下解决了这个问题.

查看他们的"并行任务"章节,特别是使用与每线程本地队列协作进行工作处理的全局队列(即使用"工作窃取"算法执行负载平衡的TPL)的部分.在他们的例子之后,我部分地模仿了我的解决方案.我的系统的最终版本在性能上有很大差异(从每秒23条消息到200多条消息).

根据系统从0到80,000所需的时间长短,您需要采用相同的设计并将其分布在多个服务器上(每个服务器具有多个处理器和多个内核).从理论上讲,我的设置需要不到7分钟才能完成所有80K的抛光,因此通过添加第二台计算机,它可以将其减少到约3分20秒等等等.诀窍是工作窃取逻辑.

值得思考的东西......

快速编辑:BTW计算机是Dell T7500工作站,具有双四核Xeons @ 3GHz,24 GB RAM,Windows 7 Ultimate 64位版本.

  • 作为一个后续行动,我自己研究这个并没有全部阅读,但我发现这里提到的这本书可以在MSDN上免费获得.这是与并行任务章节的直接链接:http://msdn.microsoft.com/en-us/library/ff963549.aspx (2认同)

Gab*_*art 6

这是我最后做的简化版本:

while(true) {
    int msgCount = 0;

    Parallel.ForEach(Enumerable.Range(0,20), (i) => {
        MessageQueue queue = new MessageQueue(_queuePath);

        try {
            msg = queue.Receive(TimeSpan.Zero);
            // do work

            Interlocked.Increment(ref msgCount);
        catch(MessageQueueException mqex) {
            if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) {
                return; // nothing in queue
            }
            else throw;
        }           
    }

    if (msgCount < 20) 
        Thread.Sleep(1000); // nothing more to do, take a break
}
Run Code Online (Sandbox Code Playgroud)

所以我尝试一次收到20条消息,计算我收到的消息.对于那些20,我让TPL去镇上.最后,如果我处理的消息少于20条,则队列为空,我再次尝试之前将线程休眠一秒钟.