多个阻塞队列,单个消费者

Ale*_*lex 19 java concurrency

我有多个BlockingQueues包含要发送的消息.是否可以减少消费者而不是队列?我不想循环遍历队列并继续轮询它们(忙等待)并且我不希望每个队列都有一个线程.相反,我希望有一个线程在任何队列上有消息时被唤醒.

min*_*rus 8

您可以做的一个技巧是拥有一个队列队列.所以你要做的是拥有一个所有线程都订阅的阻塞队列.然后,当您将某些内容排入某个BlockingQueues时,您还会在此单个队列中将阻塞队列排入队列.所以你会有类似的东西:

BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();
Run Code Online (Sandbox Code Playgroud)

然后当你得到一个新的工作项:

void addWorkItem(int queueIndex, WorkItem workItem) {
    assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
    //Note: You may want to make the two operations a single atomic operation
    producers[queueIndex].add(workItem);
    producerProducer.add(producers[queueIndex]);
}
Run Code Online (Sandbox Code Playgroud)

现在您的消费者可以全部阻止producerProducer.我不确定这种策略会有多么有价值,但它确实能达到你想要的效果.


Mar*_*ios 8

LinkedBlockingMultiQueue做你所要求的.它不允许消费者阻止任意BlockingQueues,但可以从单个"多队列"创建"子队列"并实现相同的效果.生产者提供子队列,消费者可以阻止自己轮询单个多队列,等待任何元素.

它还支持优先级,即在考虑其他队列之前从某些队列中获取元素.

例:

LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);
Run Code Online (Sandbox Code Playgroud)

然后你可以提供和投票:

sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"
Run Code Online (Sandbox Code Playgroud)

免责声明:我是图书馆的作者.