具有有界队列的Java线程池

Ami*_*hum 29 java multithreading threadpool

我使用java.util.concurrentExecutors类来创建一个固定的线程池运行请求处理程序的Web服务器:

static ExecutorService  newFixedThreadPool(int nThreads) 
Run Code Online (Sandbox Code Playgroud)

而描述是:

创建一个线程池,该线程池重用一组在共享无界队列中运行的固定线程.

但是,我正在寻找线程池实现,它将执行完全相同的操作,除了有队列.有这样的实施吗?或者我是否需要为固定线程池实现自己的包装器?

lsc*_*lin 39

您想要做的是新建自己的ExecutorService,可能使用ThreadPoolExecutor.ThreadPoolExecutor有一个构造函数,它接受一个BlockingQueue并获得一个有界的队列,例如正确构造用于边界的ArrayBlockingQueue.您还可以包含RejectedExecutionHandler,以确定队列已满时要执行的操作,或者挂起对阻塞队列的引用并使用offer方法.

这是一个小例子:

BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(
    100);
ExecutorService executorService = new ThreadPoolExecutor(1, 10, 30,
    TimeUnit.SECONDS, linkedBlockingDeque,
    new ThreadPoolExecutor.CallerRunsPolicy());
Run Code Online (Sandbox Code Playgroud)

  • 你知道为什么没有"PostingBlocksPolicy"阻止任务_can_发布?我想确保(最终)完成工作,因此丢弃或中止策略都不起作用,并且CallerRuns不会飞,因为我使用(单线程)线程池的整个目标是确保工作完成在特定的单线程上. (4认同)

Sur*_*ran 6

创建一个ThreadPoolexecutor并在其中传递合适的BlockingQueue实现.例如,您可以在ThreadPoolExecutor构造函数中传入ArrayBlockingQueue以获得所需的效果.


lan*_*ava 6

我已经用信号量解决了这个问题,我用它来限制提交到ExecutorService.

例如:

int threadCount = 10;
ExecutorService consumerPool = Executors.newFixedThreadPool(threadCount);

// set the permit count greater than thread count so that we 
// build up a limited buffer of waiting consumers
Semaphore semaphore = new Semaphore(threadCount * 100); 

for (int i = 0; i < 1000000; ++i) {
    semaphore.acquire(); // this might block waiting for a permit 
    Runnable consumer = () -> {
       try {
          doSomeWork(i);
       } finally {
          semaphore.release(); // release a permit 
       }
    };
    consumerPool.submit(consumer);
}
Run Code Online (Sandbox Code Playgroud)