Java ThreadPoolExecutor策略,带队列的'直接切换'?

6 java threadpool

我希望有一个ThreadPoolExecutor我可以设置a corePoolSize和a的地方maximumPoolSize,会发生什么是队列会立即将任务交给线程池,从而创建新线程,直到它到达maximumPoolSize然后开始添加到队列.

有这样的事吗?如果没有,有没有什么好的理由没有这样的策略?

我真正想要的是将任务提交执行,当它达到一个点,在这一点上,由于拥有太多线程(通过设置maximumPoolSize),它实际上将获得"最差"的性能,它将停止添加新线程并使用它线程池并开始排队,然后如果队列已满,则拒绝.

当负载恢复时,它可以开始拆除未使用的线程返回corePoolSize.

这在我的应用程序中比在http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html中列出的"三个一般策略"更有意义.

Mik*_*ark 3

注意:这些实现有些缺陷且不确定。在使用此代码之前,请阅读整个答案和注释。

如何创建一个工作队列,在执行程序低于最大池大小时拒绝项目,并在达到最大值后开始接受它们?

这依赖于记录的行为:

“如果请求无法排队,则会创建一个新线程,除非这超出了最大池大小,在这种情况下,任务将被拒绝。”

public class ExecutorTest
{
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAXIMUM_POOL_SIZE = 4;
    private static final int KEEP_ALIVE_TIME_MS = 5000;

    public static void main(String[] args)
    {
        final SaturateExecutorBlockingQueue workQueue = 
            new SaturateExecutorBlockingQueue();

        final ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_TIME_MS, 
                    TimeUnit.MILLISECONDS, 
                    workQueue);

        workQueue.setExecutor(executor);

        for (int i = 0; i < 6; i++)
        {
            final int index = i;
            executor.submit(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }

                    System.out.println("Runnable " + index 
                            + " on thread: " + Thread.currentThread());
                }
            });
        }
    }

    public static class SaturateExecutorBlockingQueue 
        extends LinkedBlockingQueue<Runnable>
    {
        private ThreadPoolExecutor executor;

        public void setExecutor(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }

        public boolean offer(Runnable e)
        {
            if (executor.getPoolSize() < executor.getMaximumPoolSize())
            {
                return false;
            }
            return super.offer(e);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

注意:您的问题让我感到惊讶,因为我预计您所需的行为是配置了 corePoolSize < MaximumPoolSize 的 ThreadPoolExecutor 的默认行为。但正如您所指出的,ThreadPoolExecutor 的 JavaDoc 明确指出了相反的情况。


想法#2

我想我有一个可能稍微好一点的方法。setCorePoolSize它依赖于编码到 中的方法中的副作用行为ThreadPoolExecutor。这个想法是在工作项排队时暂时有条件地增加核心池大小。当增加核心池大小时,ThreadPoolExecutor将立即产生足够的新线程来执行所有排队的(queue.size())任务。然后我们立即减小核心池的大小,这使得线程池在未来的低活动期间自然收缩。这种方法仍然不是完全确定的(例如,池大小可能会增长到最大池大小以上),但我认为几乎在所有情况下它都比第一种策略更好。

具体来说,我认为这种方法比第一种方法更好,因为:

  1. 它将更频繁地重用线程
  2. 它不会因为竞争而拒绝执行
  3. 我想再次提及,即使在非常少量的使用情况下,第一种方法也会导致线程池增长到其最大大小。在这方面,这种方法应该更加有效。

-

public class ExecutorTest2
{
    private static final int KEEP_ALIVE_TIME_MS = 5000;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAXIMUM_POOL_SIZE = 4;

    public static void main(String[] args) throws InterruptedException
    {
        final SaturateExecutorBlockingQueue workQueue = 
            new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE);

        final ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(CORE_POOL_SIZE, 
                    MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_TIME_MS, 
                    TimeUnit.MILLISECONDS, 
                    workQueue);

        workQueue.setExecutor(executor);

        for (int i = 0; i < 60; i++)
        {
            final int index = i;
            executor.submit(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }

                    System.out.println("Runnable " + index 
                            + " on thread: " + Thread.currentThread()
                            + " poolSize: " + executor.getPoolSize());
                }
            });
        }

        executor.shutdown();

        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public static class SaturateExecutorBlockingQueue 
        extends LinkedBlockingQueue<Runnable>
    {
        private final int corePoolSize;
        private final int maximumPoolSize;
        private ThreadPoolExecutor executor;

        public SaturateExecutorBlockingQueue(int corePoolSize, 
                int maximumPoolSize)
        {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
        }

        public void setExecutor(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }

        public boolean offer(Runnable e)
        {
            if (super.offer(e) == false)
            {
                return false;
            }
            // Uncomment one or both of the below lines to increase
            // the likelyhood of the threadpool reusing an existing thread 
            // vs. spawning a new one.
            //Thread.yield();
            //Thread.sleep(0);
            int currentPoolSize = executor.getPoolSize();
            if (currentPoolSize < maximumPoolSize 
                    && currentPoolSize >= corePoolSize)
            {
                executor.setCorePoolSize(currentPoolSize + 1);
                executor.setCorePoolSize(corePoolSize);
            }
            return true;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)