Java - ExecutorService 有最大大小

Bas*_*ast 2 java multithreading

有没有办法通过一个巨大的数据库并并行应用一些工作来获取条目?我尝试使用 ExecutorService,但我们必须关闭()才能知道池大小......

所以我最好的解决方案是:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest) 
{
    return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}

public static void main(String args[]) throws Exception
{
    int dbOffset = 0;
    int nbOfArticlesPerRequest = 100;
    int MYTHREADS = 10;
    int loopIndex = 0;
    boolean bContinue=true;
    Runnable worker;



    while(bContinue) // in this loop we'll constantly fill the pool list
    {
        loopIndex++;
        ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...

        List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
        for(String id: ids) {
            worker = new MyRunnable(id);
            executor.execute(worker);
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
            System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                    " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
            );
            TimeUnit.MILLISECONDS.sleep(500);
        }

        if(loopIndex>=3) {
            System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
            bContinue = false;
        }
        dbOffset+=nbOfArticlesPerRequest;
    }
}



public static class MyRunnable implements Runnable {

    private final String id;

    MyRunnable(String id) {
        this.id = id;
    }

        @Override
        public void run()
        {
            System.out.println("Thread '"+id+"' started");
            try {
                TimeUnit.MILLISECONDS.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread '"+id+"' stopped");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这工作正常,但缺点是在循环的每一端我都需要等待最后一个线程完成。

例如:当只有 3 个线程正在运行时......

为了解决这个问题,我做了以下事情,但是“安全”/正确吗?

顺便说一句:有没有办法知道队列中有多少任务/线程?

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executor.execute(worker);
             }

            while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
                System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                        " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
                );
                TimeUnit.MILLISECONDS.sleep(500);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executor.shutdown();
    // Wait until all threads are finish
    while (!executor.isTerminated()) {
        System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }
Run Code Online (Sandbox Code Playgroud)

编辑:

我尝试启动 1 或 10 百万个任务,所以(我假设)我不能将它们全部放入队列......这就是为什么我使用全局执行器以便能够始终在队列中拥有一些线程(为此我不能关闭执行器,否则它不再可用)。

最新代码版本:

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executorPool.execute(worker);
             }

            while (executorPool.getActiveCount() >= MYTHREADS  || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2)) 
            {
                System.out.println("Pool size is now " + executorPool.getActiveCount()+
                                        " - queue size: "+ executorPool.getQueue().size()
                );

                if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
                    System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
                    break;
                }

                TimeUnit.MILLISECONDS.sleep(2000);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executorPool.shutdown();
    // Wait until all threads are finish
    while (!executorPool.isTerminated()) {
        System.out.println("Pool size is now " + executorPool.getActiveCount()+
                " - queue size: "+ executorPool.getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }
Run Code Online (Sandbox Code Playgroud)

提前致谢

Dav*_*iro 7

更新

现在我很清楚,您主要担心的是您不能一次提交 1000 万个任务。

不要害怕,您可以将它们全部提交给 executor。并行运行的实际任务数量受底层线程池大小的限制。也就是说,如果池大小为 2,则此时只有两个任务正在执行,其余任务则坐在队列中等待空闲线程。

默认情况下,Executors.newFixedThreadPool()会创建一个队列Integer.MAX_VALUE大小的 Executor ,因此您的数百万个任务将适合其中。


您可以使用ExecutorService.submit()返回Future. 然后您可以检查您的 Future 任务的状态(即使用isDone(),isCancelled()方法)。

Executor 通常是您不想明确关闭的东西,并且存在于整个应用程序生命周期中。使用这种方法,您无需关闭即可了解有多少待处理的任务。

List<Future<?>> tasks = new ArrayList<>();
for (String id : ids) {
    Future<?> task = executorService.submit(() -> {
        // do work
    });
    tasks.add(task);
}

long pending = tasks.stream().filter(future -> !future.isDone()).count();
System.out.println(pending + " task are still pending");
Run Code Online (Sandbox Code Playgroud)

此外,请注意任务和线程不是可互换的术语。在您的情况下,执行程序具有固定数量的线程。您可以提交比这更多的任务,但其余的将位于执行程序的队列中,直到有一个空闲线程来运行该任务。