在具有等待任务时动态调整java.util.concurrent.ThreadPoolExecutor的大小

Edw*_*ern 18 java concurrency multithreading

我正在使用a java.util.concurrent.ThreadPoolExecutor来并行处理多个项目.尽管线程本身工作正常,但由于线程中发生的操作,我们有时会遇到其他资源限制,这使我们想要调低池中线程的数量.

我想知道在线程实际工作时是否有办法拨打线程数.我知道你可以调用setMaximumPoolSize()和/或setCorePoolSize(),但是这些只会在线程空闲后调整池的大小,但是在队列中没有任务等待之前它们不会变为空闲.

Sco*_*Coy 36

你绝对可以.调用setCorePoolSize(int)将更改池的核心大小.对此方法的调用是线程安全的,并覆盖提供给构造函数的设置ThreadPoolExecutor.如果要修剪池大小,其余线程将在其当前作业队列完成后关闭(如果它们处于空闲状态,它们将立即关闭).如果要增加池大小,将尽快分配新线程.分配新线程的时间范围没有记录 - 但在实现中,每次调用该execute方法时都会执行新线程的分配.

要将其与运行时可调参数作业服务器场配对,您可以将此属性(通过包装器或使用动态MBean导出器)公开为读写JMX属性,以创建一个相当不错的,即时可调的批处理器.

要在运行时强制减小池大小(这是您的请求),您必须ThreadPoolExecutor对该beforeExecute(Thread,Runnable)方法进行子类化并添加中断.中断线程不是一个充分的中断,因为它只与等待状态交互,并且在处理期间ThreadPoolExecutor任务线程不会进入可中断状态.

我最近遇到了同样的问题,试图在执行所有提交的任务之前强制终止线程池.为了实现这一点,我通过在将UncaughtExceptionHandler线程替换为期望我的特定异常并且丢弃它的线程之后抛出运行时异常来中断线程.

/**
 * A runtime exception used to prematurely terminate threads in this pool.
 */
static class ShutdownException
extends RuntimeException {
    ShutdownException (String message) {
        super(message);
    }
}

/**
 * This uncaught exception handler is used only as threads are entered into
 * their shutdown state.
 */
static class ShutdownHandler 
implements UncaughtExceptionHandler {
    private UncaughtExceptionHandler handler;

    /**
     * Create a new shutdown handler.
     *
     * @param handler The original handler to deligate non-shutdown
     * exceptions to.
     */
    ShutdownHandler (UncaughtExceptionHandler handler) {
        this.handler = handler;
    }
    /**
     * Quietly ignore {@link ShutdownException}.
     * <p>
     * Do nothing if this is a ShutdownException, this is just to prevent
     * logging an uncaught exception which is expected.  Otherwise forward
     * it to the thread group handler (which may hand it off to the default
     * uncaught exception handler).
     * </p>
     */
    public void uncaughtException (Thread thread, Throwable throwable) {
        if (!(throwable instanceof ShutdownException)) {
            /* Use the original exception handler if one is available,
             * otherwise use the group exception handler.
             */
            if (handler != null) {
                handler.uncaughtException(thread, throwable);
            }
        }
    }
}
/**
 * Configure the given job as a spring bean.
 *
 * <p>Given a runnable task, configure it as a prototype spring bean,
 * injecting any necessary dependencices.</p>
 *
 * @param thread The thread the task will be executed in.
 * @param job The job to configure.
 *
 * @throws IllegalStateException if any error occurs.
 */
protected void beforeExecute (final Thread thread, final Runnable job) {
    /* If we're in shutdown, it's because spring is in singleton shutdown
     * mode.  This means we must not attempt to configure the bean, but
     * rather we must exit immediately (prematurely, even).
     */
    if (!this.isShutdown()) {
        if (factory == null) {
            throw new IllegalStateException(
                "This class must be instantiated by spring"
                );
        }

        factory.configureBean(job, job.getClass().getName());
    }
    else {
        /* If we are in shutdown mode, replace the job on the queue so the
         * next process will see it and it won't get dropped.  Further,
         * interrupt this thread so it will no longer process jobs.  This
         * deviates from the existing behavior of shutdown().
         */
        workQueue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}
Run Code Online (Sandbox Code Playgroud)

在您的情况下,您可能想要创建一个没有许可证的信号量(仅用作线程安全计数器),并且在关闭线程时向其释放许多许可,这些许可对应于先前核心池大小的增量和新池大小(要求您覆盖该setCorePoolSize(int)方法).这将允许您在当前任务完成后终止线程.

private Semaphore terminations = new Semaphore(0);

protected void beforeExecute (final Thread thread, final Runnable job) {
    if (terminations.tryAcquire()) {
        /* Replace this item in the queue so it may be executed by another
         * thread
         */
        queue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

public void setCorePoolSize (final int size) {
    int delta = getActiveCount() - size;

    super.setCorePoolSize(size);

    if (delta > 0) {
        terminations.release(delta);
    }
}
Run Code Online (Sandbox Code Playgroud)

这应该中断n个线程f(n)= active - request.如果有任何问题,ThreadPoolExecutors分配策略相当持久.它使用finally保证执行的块保持提前终止.因此,即使您终止太多线程,它们也会重新填充.

  • 是的,我知道您可以进行更改并在线程空闲后使其生效。然而,我发现当线程完成其任务时不会发生“空闲”,除非没有其他任务在等待。我在问是否有任何方法可以让更改立即生效,这样即使有等待的任务,它们也将由新的所需线程数处理(如我的示例中,我们想立即拨下正在使用的并发资源量)。 (2认同)

Ste*_*ide 6

解决方案是耗尽ThreadPoolExecutor队列,根据需要设置ThreadPoolExecutor大小,然后在其他线程结束时逐个添加线程.在ThreadPoolExecutor类中排空队列的方法是私有的,因此您必须自己创建它.这是代码:

/**
 * Drains the task queue into a new list. Used by shutdownNow.
 * Call only while holding main lock.
 */
public static List<Runnable> drainQueue() {
    List<Runnable> taskList = new ArrayList<Runnable>();
    BlockingQueue<Runnable> workQueue = executor.getQueue();
    workQueue.drainTo(taskList);
    /*
     * If the queue is a DelayQueue or any other kind of queue
     * for which poll or drainTo may fail to remove some elements,
     * we need to manually traverse and remove remaining tasks.
     * To guarantee atomicity wrt other threads using this queue,
     * we need to create a new iterator for each element removed.
     */
    while (!workQueue.isEmpty()) {
        Iterator<Runnable> it = workQueue.iterator();
        try {
            if (it.hasNext()) {
                Runnable r = it.next();
                if (workQueue.remove(r))
                    taskList.add(r);
            }
        } catch (ConcurrentModificationException ignore) {
        }
    }
    return taskList;
}
Run Code Online (Sandbox Code Playgroud)

在调用此方法之前,您需要获取然后释放主锁.为此,您需要使用java反射,因为字段"mainLock"是私有的.再次,这是代码:

private Field getMainLock() throws NoSuchFieldException {
    Field mainLock = executor.getClass().getDeclaredField("mainLock");
    mainLock.setAccessible(true);
    return mainLock;
}
Run Code Online (Sandbox Code Playgroud)

"执行者"是你的ThreadPoolExecutor.

现在你需要锁定/解锁方法:

public void lock() {
    try {
        Field mainLock = getMainLock();
        Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null);
        lock.invoke(mainLock.get(executor), (Object[])null);
    } catch {
        ...
    } 
}

public void unlock() {
    try {
        Field mainLock = getMainLock();
        mainLock.setAccessible(true);
        Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null);
        lock.invoke(mainLock.get(executor), (Object[])null);
    } catch {
        ...
    }  
}
Run Code Online (Sandbox Code Playgroud)

最后,您可以编写"setThreadsNumber"方法,它可以增加和减少ThreadPoolExecutor大小:

public void setThreadsNumber(int intValue) {
    boolean increasing = intValue > executor.getPoolSize();
    executor.setCorePoolSize(intValue);
    executor.setMaximumPoolSize(intValue);
    if(increasing){
        if(drainedQueue != null && (drainedQueue.size() > 0)){
            executor.submit(drainedQueue.remove(0));
        }
    } else {
        if(drainedQueue == null){
            lock();
            drainedQueue = drainQueue();
            unlock();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

注意:显然,如果执行N个并行线程并将此数字更改为N-1,则所有N个线程将继续运行.当第一个线程结束时,不会执行新线程.从现在开始,并行线程的数量将是您选择的数量.


Tim*_*der 5

据我所知,这是一个不太干净的方式是不可能的.

您可以实现beforeExecute方法来检查一些布尔值并强制线程暂时停止.请记住,它们将包含一个在重新启用之前不会执行的任务.

或者,您可以实现afterExecute以在饱和时抛出RuntimeException.这将有效地导致Thread死亡,并且由于Executor将高于最大值,因此不会创建新的.

我不建议你这样做.相反,尝试找到一些其他方法来控制导致问题的任务的并发执行.可能通过在具有更有限数量的工作者的单独线程池中执行它们.