如何让ThreadPoolExecutor在排队之前将线程增加到最大值?

Gra*_*ray 90 java multithreading blockingqueue threadpoolexecutor

我已经沮丧了一段时间,其默认行为ThreadPoolExecutor支持ExecutorService我们这么多人使用的线程池.引用Javadocs:

如果有多个corePoolSize但运行的maximumPoolSize线程少于maximumPoolSize,则只有在队列已满时才会创建新线程.

这意味着如果您使用以下代码定义线程池,它将永远不会启动第二个线程,因为它LinkedBlockingQueue是无限制的.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Run Code Online (Sandbox Code Playgroud)

只有当您有一个有界队列并且队列已满时才会启动核心编号之上的任何线程.我怀疑大量的初级Java多线程程序员并不知道这种行为ThreadPoolExecutor.

现在我有一个特定的用例,这不是最佳的.我正在寻找方法,而不是编写我自己的TPE课程来解决它.

我的要求是针对可能不可靠的第三方回拨的Web服务.

  • 我不想与web-request同步回调,所以我想使用一个线程池.
  • 我通常会得到一些这样的一分钟,所以我不希望有newFixedThreadPool(...)大量的线程,大多数都处于休眠状态.
  • 我经常会遇到这种流量的爆发,我想将线程数量扩大到某个最大值(比方说50).
  • 我需要尽最大努力做所有回调,所以我想排队50以上的任何额外的回调.我不想通过使用a来压倒我的网络服务器的其余部分newCachedThreadPool().

更多线程启动之前,如何解决ThreadPoolExecutor队列需要限制和填充的限制?如何让它排队任务之前启动更多线程?

编辑:

@Flavio提出了使用ThreadPoolExecutor.allowCoreThreadTimeOut(true)核心线程超时并退出的好处.我考虑过这一点,但我仍然想要核心线程功能.如果可能的话,我不希望池中的线程数降到核心大小以下.

Gra*_*ray 44

ThreadPoolExecutor在更多线程启动之前,如何解决队列需要限制和填充的限制问题.

我相信我终于找到了一个有点优雅(也许有点hacky)解决这个限制的方法ThreadPoolExecutor.它包括扩展LinkedBlockingQueue使其返回falsequeue.offer(...)时候,已经有一些任务排队.如果当前线程没有跟上排队任务,TPE将添加其他线程.如果池已经处于最大线程,则将RejectedExecutionHandler调用该池.它是处理程序然后put(...)进入队列.

编写一个offer(...)可以返回falseput()永不阻塞的队列当然很奇怪,这就是hack部分.但这适用于TPE对队列的使用,所以我没有看到这样做有任何问题.

这是代码:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        /*
         * Offer it to the queue if there is 0 items already queued, else
         * return false so the TPE will add another thread. If we return false
         * and max threads have been reached then the RejectedExecutionHandler
         * will be called which will do the put into the queue.
         */
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            /*
             * This does the actual put into the queue. Once the max threads
             * have been reached, the tasks will then queue up.
             */
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

有了这个机制,当我向队列提交任务时,ThreadPoolExecutor意志:

  1. 最初将线程数量扩展到核心大小(此处为1).
  2. 提供给队列.如果队列为空,它将排队等待现有线程处理.
  3. 如果队列已经有1个或更多元素,offer(...)则返回false.
  4. 如果返回false,则向上扩展池中的线程数,直到它们达到最大数量(此处为50).
  5. 如果在最大值,那么它会调用 RejectedExecutionHandler
  6. RejectedExecutionHandler随后把任务分成由FIFO顺序第一个可用线程处理队列中.

虽然在上面的示例代码中,队列是无限制的,但您也可以将其定义为有界队列.例如,如果您将容量添加到1000,LinkedBlockingQueue那么它将:

  1. 将线程缩放到最大值
  2. 然后排队直到它满了1000个任务
  3. 然后阻止调用者,直到空间可用于队列.

另外,如果你真的需要offer(...)RejectedExecutionHandler那时使用, 你可以使用该offer(E, long, TimeUnit)方法而不是Long.MAX_VALUE超时.

编辑:

ourQueue.setThreadPoolExecutor(tpe);根据@Ralf的反馈调整了我的方法覆盖.如果它们没有跟上负载,这只会扩大池中的线程数.

编辑:

对这个答案的另一个调整可能是实际询问TPE是否存在空闲线程并且仅在存在该项目时才将其排队.你必须为此创建一个真正的类并在其offer(...)上添加一个方法.

然后您的tpe.getPoolSize() == tpe.getMaximumPoolSize()方法可能类似于:

  1. 检查super.offer(...)在哪种情况下只是打电话tpe.getPoolSize() > tpe.getActiveCount().
  2. 否则,如果super.offer(...)然后调用,false因为似乎有空闲线程.
  3. 否则返回volatilefork另一个线程.

也许这个:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}
Run Code Online (Sandbox Code Playgroud)

请注意,TPE上的get方法很昂贵,因为它们访问getActiveCount()字段或(在这种情况下ThreadPoolExecutor)锁定TPE并遍历线程列表.此外,这里存在竞争条件,可能导致任务不正确地排队,或者在有空闲线程时分叉另一个线程.

  • 难道你在这里得到一个奇怪的是,前几个任务将排队,并且只有在新线程产生之后?例如,如果你的一个核心线程忙于一个长时间运行的任务,并且你调用`execute(runnable)`,那么`runnable`就会被添加到队列中.如果调用`execute(secondRunnable)`,则会将`secondRunnable`添加到队列中.但是现在如果你调用`execute(thirdRunnable)`,那么`thirdRunnable`将在一个新线程中运行.`runnable`和`secondRunnable`只运行一次`thirdRunnable`(或原来的长期运行任务). (3认同)

RTS*_*lio 26

我已经在这个问题上得到了另外两个答案,但我怀疑这个答案是最好的.

它基于当前接受的答案的技术,即:

  1. 覆盖队列的offer()方法(有时)返回false,
  2. 这会导致ThreadPoolExecutor生成新线程或拒绝任务,并且
  3. 设置RejectedExecutionHandler实际排队上拒绝的任务.

问题是什么时候offer()应该返回false.当队列上有几个任务时,当前接受的答案返回false,但正如我在那里的评论中所指出的,这会导致不良影响.或者,如果总是返回false,即使线程在队列中等待,您也将继续生成新线程.

解决方案是使用Java 7 LinkedTransferQueue并进行offer()调用tryTransfer().当有一个等待的消费者线程时,该任务将被传递给该线程.否则,offer()将返回false并且ThreadPoolExecutor将生成一个新线程.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
Run Code Online (Sandbox Code Playgroud)

  • @RobertTupelo-Schneck 很有魅力!我不知道为什么java中没有类似的开箱即用的东西 (2认同)

Fla*_*vio 25

将核心大小和最大大小设置为相同的值,并允许从池中删除核心线程allowCoreThreadTimeOut(true).

  • 到目前为止,愚蠢设计问题的最佳解决方法. (4认同)
  • +1 是的,我想到了这一点,但我仍然想拥有核心线程功能。我不希望线程池在休眠期间变为 0 个线程。我将编辑我的问题以指出这一点。但非常好的一点。 (2认同)

RTS*_*lio 7

注意:我现在更喜欢并推荐我的其他答案.

这是一个对我来说更简单的版本:每当执行新任务时增加corePoolSize(最大为maximumPoolSize的限制),然后每当a执行新任务时减少corePoolSize(降低到用户指定的"核心池大小"的限制)任务完成.

换句话说,跟踪正在运行或排队的任务的数量,并确保corePoolSize等于任务数量,只要它在用户指定的"核心池大小"和maximumPoolSize之间.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}
Run Code Online (Sandbox Code Playgroud)

如上所述,该类不支持在构造后更改用户指定的corePoolSize或maximumPoolSize,并且不支持直接或通过remove()或操作工作队列purge().

  • 我给了这个解决方案一些严肃的测试,它显然是最好的.在高度多线程的环境中,当有空闲线程时(由于自由线程TPE.exe执行性质),它仍然有时会排队,但它很少发生,而不是标记为答案的解决方案,其中竞争条件有更多机会发生了,所以每次多线程运行都会发生这种情况. (2认同)

Ral*_*f H 6

我们有一个子类ThreadPoolExecutor,需要额外的creationThreshold和覆盖execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

也许这也有帮助,但你的当然看起来更有艺术气息......