mem*_*und 2 java queue concurrency
我有多个生产者线程同时将对象添加到共享队列。
我想创建一个从共享队列中读取数据以进行进一步数据处理(数据库批量插入)的单线程使用者。
问题:我只想从队列中分块获取数据,以便在批量插入期间获得更好的性能。因此,我必须以某种方式检测队列中有多少项目,然后从队列中取出所有这些项目,然后再次清空队列。
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();
ExecutorService pes = Executors.newFixedThreadPool(4);
ExecutorService ces = Executors.newFixedThreadPool(1);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
pes.submit(new Producer(sharedQueue, 3));
pes.submit(new Producer(sharedQueue, 4));
ces.submit(new Consumer(sharedQueue, 1));
class Producer implements Runnable {
run() {
...
sharedQueue.put(obj);
}
}
class Consumer implements Runnable {
run() {
...
sharedQueue.take();
}
}
Run Code Online (Sandbox Code Playgroud)
消费者的问题:如何轮询共享队列,等待队列中有 X 个项目,然后取出所有项目并同时清空队列(这样消费者可以再次开始轮询和等待)?
我愿意接受任何建议,并且不一定与上面的代码绑定。
小智 5
我最近开发了这个实用程序,如果队列元素没有达到批量大小,则使用刷新超时来批处理 BlockingQueue 元素。它还支持使用多个实例来阐述同一组数据的扇出模式:
// Instantiate the registry
FQueueRegistry registry = new FQueueRegistry();
// Build FQueue consumer
registry.buildFQueue(String.class)
.batch()
.withChunkSize(5)
.withFlushTimeout(1)
.withFlushTimeUnit(TimeUnit.SECONDS)
.done()
.consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));
// Push data into queue
for(int i = 0; i < 10; i++){
registry.sendBroadcast("Sample"+i);
}
Run Code Online (Sandbox Code Playgroud)
更多信息在这里!
https://github.com/fulmicotone/io.fulmicotone.fqueue