如何在ForkJoinPool中阻止队列?

Iva*_*lin 3 java multithreading fork-join blockingqueue forkjoinpool

我需要在其队列已满时阻止ForkJoinPool上的线程.这可以在标准的ThreadPoolExecutor中完成,例如:

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            5000L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
Run Code Online (Sandbox Code Playgroud)

我知道,ForkJoinPool中有一些Dequeue,但是我无法通过它访问它.

更新:请参阅下面的答案.

Iva*_*lin 8

经过一番研究,我很乐意回答这个问题:

原因: 由于以下原因,ForkJoinPool的实现中没有这样的选项.大多数juc Executors都假设单个并发队列和许多线程.当多个线程读取/写入队列时,这会导致队列争用并降低性能.因此,这种方法不是很可扩展 - >队列上的高争用可以生成大量的上下文切换和CPU业务.

实现: 在ForkJoinPool中,每个线程都有一个由数组支持的单独的双端队列(Deque).为了最大限度地减少争用,工作窃取发生在双端队列的尾部,而任务提交发生在当前线程(工作者)的头部.尾部包含最大部分的工作.换句话说,通过另一个工作线程从尾部窃取可以最大限度地减少与其他工作人员交互的次数 - >争用更少,整体性能更好.

解决方法的想法: 有全局提交队列.来自非FJ线程的提交进入提交队列(工作人员承担这些任务).还有上面提到的工人队列.

队列的最大大小受数量限制:

   /**
     * Maximum size for queue arrays. Must be a power of two less
     * than or equal to 1 << (31 - width of array entry) to ensure
     * lack of wraparound of index calculations, but defined to a
     * value a bit less than this to help users trap runaway
     * programs before saturating systems.
     */
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
Run Code Online (Sandbox Code Playgroud)

当队列已满时,将抛出未经检查的异常:

RejectedExecutionException("Queue capacity exceeded")
Run Code Online (Sandbox Code Playgroud)

这在javadocs中有描述.

(另请参阅ThreadPool的构造函数UncaughtExceptionHandler)

我倾向于声称当前的实现没有这样的机制,这应该由我们在消费API中实现.

例如,这可以按如下方式完成:

  1. 实现指数后退逻辑,该逻辑尝试通过增加下一次重试的时间间隔来定期重新提交任务.要么..
  2. 编写一个定期检查submissionQueue大小的限制器(请参阅参考资料ForkJoinPool.getQueuedSubmissionCount()).

以下是ForkJoinPool的官方JSR-166E java代码以获取更多信息.

  • 这个答案中是否隐藏着我看不到的解决方案,或者更像是讨论?(我遇到了同样的问题) (2认同)