如何管理M个线程(每个任务1个),同时确保只有N个线程.N <M.在Java中

use*_*427 5 java concurrency multithreading

我在java中有一个任务队列.此队列位于数据库的表中.

我需要:

  • 每个任务仅1个线程
  • 不超过N个线程同时运行.这是因为线程具有数据库交互,我不希望打开一堆数据库连接.

我想我可以这样做:

final Semaphore semaphore = new Semaphore(N);
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        final CountDownLatch cdl = new CountDownLatch(tasks.size());
        for (final JobTask task : tasks) {
            Thread tr = new Thread(new Runnable() {

                @Override
                public void run() {
                    semaphore.acquire();
                    task.doWork();
                    semaphore.release();
                    cdl.countDown();
                }

            });
        }
        cdl.await();
    }
}
Run Code Online (Sandbox Code Playgroud)

我知道ExecutorService类存在,但我不确定它是否可以使用它.

那么,你认为这是最好的方法吗?或者你能澄清一下ExecutorService如何解决这个问题吗?

最终解决方案

我认为最好的解决方案是这样的:

while (isOnJob) {
    ExecutorService executor = Executors.newFixedThreadPool(N);
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        for (final JobTask task : tasks) {
            executor.submit(new Runnable() {

                @Override
                public void run() {
                    task.doWork();
                }

            });
        }
    }
    executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
}
Run Code Online (Sandbox Code Playgroud)

非常感谢发型师.BTW我正在使用连接池,但对DB的查询非常繁重,我不希望同时拥有不受控制的任务数.

Joã*_*lva 7

你确实可以使用ExecutorService.例如,使用该newFixedThreadPool方法创建一个新的固定线程池.这样,除了缓存线程之外,还可以保证只有n线程同时运行.

这些方面的东西:

private static final ExecutorService executor = Executors.newFixedThreadPool(N);
// ...
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        List<Future<?>> futures = new ArrayList<Future<?>>();
        for (final JobTask task : tasks) {
                Future<?> future = executor.submit(new Runnable() {    
                        @Override
                        public void run() {
                                task.doWork();
                        }
                });
                futures.add(future);
        }
        // you no longer need to use await
        for (Future<?> fut : futures) {
          fut.get();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,您不再需要使用锁存器get,如果需要,将等待计算完成.