如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?

Err*_*404 10 java multithreading

我从队列服务器获取数据,我需要处理它并发送确认.像这样的东西:

while (true) {
    queueserver.get.data
    ThreadPoolExecutor //send data to thread
    queueserver.acknowledgement 
Run Code Online (Sandbox Code Playgroud)

我不完全理解线程中发生了什么,但我认为这个程序获取数据,发送线程然后立即确认它.所以即使我有一个每个队列的限制只能有200个未确认的项目,它只会拉到它可以接收它的速度.当我在单个服务器上编写程序时,这很好,但是如果我使用多个工作程序,那么这就成了一个问题,因为线程队列中的项目数量不是它完成的工作的反映,而是它的速度有多快可以从队列服务器获取项目.

如果线程队列充满工作,我能以某种方式使程序等待吗?

Gra*_*ray 20

如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?

我不是百分百肯定我在这里理解你的问题.当然不是开放式队列,而是可以使用BlockingQueue限制:

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);
Run Code Online (Sandbox Code Playgroud)

就提交给a的作业而言,您可以创建自己的作业ExecutorService,而不是使用使用无限队列ExecutorService创建的默认作业Executors:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new ArrayBlockingQueue<Runnable>(200));
Run Code Online (Sandbox Code Playgroud)

队列填满后,将导致它拒绝任何提交的新任务.您需要设置RejectedExecutionHandler提交到队列的提交.就像是:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
           0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});
Run Code Online (Sandbox Code Playgroud)

我认为这是Java没有的重大缺失ThreadPoolExecutor.CallerBlocksPolicy.