并行处理严格的消息

mat*_*hes 10 java concurrency priority-queue

在我的JavaEE Web应用程序中,我需要严格按照到达顺序处理传入的消息.我假设我的webapp容器(Tomcat 6)在到达http端口时保留了消息的顺序.

是什么让我头痛,是我在内部处理这些消息的方式.为了改善工作量,我将每个消息的处理附加到ThreadPool,因为这里需要做很多事情,例如XML解析,有时使用外部Web服务来丰富数据.处理完成后,我将消息的java表示推送到复杂的流处理引擎esper.codehaus.org,这是线程安全的.这里,检查不同的模式,其中入口顺序是最高要求,例如现象的阈值超过.

我有想法将每个已处理的消息插入到PriorityQueue中,并在到达时收到优先级ID(在我的Servlet中,每个消息都会递增).问题如下:

从队列中轮询元素(最低ID是队列的头部)以将其插入esper的线程可以跳过ID,因为它不检查丢失的项目.我猜插图效果更好:

在此输入图像描述

对于步骤(1)至(4),一切都按预期工作.但是在步骤(5),QueuePoller检索元素6而不是元素4(稍后在步骤(6)插入).这导致消息顺序:2; 3; 6; 4.

我试图做的是改变轮询队列头部的实现,以遵循严格的ID顺序.意思是,如果下一个ID的元素尚未插入到队列中,则在屏障处等待直到其中.这似乎在前10分钟有效但随后被绞死,可能是由于一个从未插入队列的元素.

过去有过类似问题的人对我有一些暗示吗?

Vla*_*kov 3

查看Disruptor——一个具有严格顺序的高性能队列(先进入——先服务)