标签: forkjoinpool

ForkJoinPool重置线程中断状态

我刚刚在取消ForkJoinPool返回的Future时注意到以下现象.给出以下示例代码:

ForkJoinPool pool = new ForkJoinPool();
Future<?> fut = pool.submit(new Callable<Void>() {

  @Override
  public Void call() throws Exception {
    while (true) {
      if (Thread.currentThread().isInterrupted()) { // <-- never true
        System.out.println("interrupted");
        throw new InterruptedException();
      }
    }
  }
});

Thread.sleep(1000);
System.out.println("cancel");
fut.cancel(true);
Run Code Online (Sandbox Code Playgroud)

该程序永远不会打印interrupted.ForkJoinTask#cancel(boolean)的文档说:

mayInterruptIfRunning - 此值在默认实现中无效,因为中断不用于控制取消.

如果ForkJoinTasks忽略了中断,你应该如何检查提交给ForkJoinPool的Callables中的取消?

java multithreading java.util.concurrent forkjoinpool

4
推荐指数
1
解决办法
884
查看次数

ForkJoinPool.invoke()和ForkJoinTask.invoke()或compute()

我正在阅读Java ForkJoin框架.通过不直接调用(例如)invoke()的实现,但实例化和调用有什么额外的好处?当我们称这两种方法全部被调用时到底发生了什么?ForkJoinTaskRecursiveTaskForkJoinPoolpool.invoke(task)invoke

从源代码来看,似乎如果recursiveTask.invoke被调用,它将以托管线程池的方式调用它exec并最终调用它compute.因此,为什么我们有这个成语更令人困惑pool.invoke(task).

我写了一些简单的代码来测试性能差异,但我没有看到任何.也许测试代码错了?见下文:

public class MyForkJoinTask extends RecursiveAction {

    private static int totalWorkInMillis = 20000;
    protected static int sThreshold = 1000;

    private int workInMillis;


    public MyForkJoinTask(int work) {
        this.workInMillis = work;
    }

    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        try {

            ForkJoinTask<Object> objectForkJoinTask = new ForkJoinTask<>();
            Thread.sleep(workInMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void …
Run Code Online (Sandbox Code Playgroud)

java concurrency java.util.concurrent fork-join forkjoinpool

4
推荐指数
1
解决办法
3968
查看次数

Fork-Join线程池中的长时间运行任务是否可以阻止所有线程?

我一直在学习Java 8的并行流,这篇文章出现在Google搜索的第一页上.我无法理解文章中的这一特定内容

...所有并行流都使用公共fork-join线程池,如果您提交长时间运行的任务,则可以有效地阻止池中的所有线程.

我无法弄清楚长时间运行的任务如何阻止池中的所有其他线程.

java multithreading java-8 forkjoinpool java-stream

4
推荐指数
1
解决办法
654
查看次数

如何在ForkJoinPool中阻止队列?

我需要在其队列已满时阻止ForkJoinPool上的线程.这可以在标准的ThreadPoolExecutor中完成,例如:

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            5000L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
Run Code Online (Sandbox Code Playgroud)

我知道,ForkJoinPool中有一些Dequeue,但是我无法通过它访问它.

更新:请参阅下面的答案.

java multithreading fork-join blockingqueue forkjoinpool

3
推荐指数
1
解决办法
1918
查看次数

ForkJoinPool vs Plain Recursion

在阅读了有关ForkJoinPool之后,我尝试了一个实验来测试ForkJoinPool与普通递归相比实际有多快.

我递归地计算了一个文件夹中的文件数,而且对我来说,普通的递归方式表现得更好 ForkJoinPool

这是我的代码.

递归任务

class DirectoryTask extends RecursiveTask<Long> {

    private Directory directory;

    @Override
    protected Long compute() {
        List<RecursiveTask<Long>> forks = new ArrayList<>();
        List<Directory> directories = directory.getDirectories();
        for (Directory directory : directories) {
            DirectoryTask directoryTask = new DirectoryTask(directory);
            forks.add(directoryTask);
            directoryTask.fork();
        }
        Long count = directory.getDoumentCount();
        for (RecursiveTask<Long> task : forks) {
            count += task.join();
        }
        return count;
    }
}
Run Code Online (Sandbox Code Playgroud)

普通递归

private static Long getFileCount(Directory directory) {
        Long recursiveCount = 0L;
        List<Directory> directories = directory.getDirectories();
        if (null …
Run Code Online (Sandbox Code Playgroud)

java concurrency recursion forkjoinpool

3
推荐指数
2
解决办法
355
查看次数

ForkJoinPool#awaitQuiescence 实际上是如何工作的?

我有下一个实现RecursiveAction,这个类的单一目的 - 是从 0 到 9 打印,但如果可能的话,从不同的线程打印:

public class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我认为调用awaitQuiescence将使当前线程等待所有任务(提交和分叉)完成:

public class Main {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.execute(new MyRecursiveAction(0));
        System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
    }
}
Run Code Online (Sandbox Code Playgroud)

但我并不总是得到正确的结果,而不是打印 10 次,打印 0 到 10 次。 …

java-8 forkjoinpool

3
推荐指数
1
解决办法
97
查看次数

ForkJoinPool 大小动态增加?

相关:ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢?

我正在研究通过 parallelStream 和 CompletableFutures 并行化网络调用的不同方法。因此,我遇到过这种情况,Java 的 parallelStream 使用的 ForkJoinPool.commonPool() 的大小动态增长,从 ~ #Cores 到最大值 64。

爪哇细节: $ java -version

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
Run Code Online (Sandbox Code Playgroud)

显示这种行为的代码如下(完整的可执行代码在这里

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
Run Code Online (Sandbox Code Playgroud)

示例输出:

16:07:17.443 [pool-2-thread-7] INFO  generalworks.parallelism.DummyProcess -  44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO  generalworks.parallelism.DummyProcess …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing multithreading forkjoinpool java-stream

3
推荐指数
1
解决办法
112
查看次数

什么是 ParallelStream 队列行为?

我正在使用parallelStream 并行上传一些文件,有些是大文件,有些是小文件。我注意到并非所有工人都被使用。

一开始一切都运行良好,所有线程都被使用(我将并行度选项设置为 16)。然后在某一时刻(一旦到达更大的文件),它只使用一个线程

简化代码:

files.parallelStream().forEach((file) -> {
    try (FileInputStream fileInputStream = new FileInputStream(file)) {
                IDocumentStorageAdaptor uploader = null;

                try {
                    logger.debug("Adaptors before taking: " + uploaderPool.size());
                    uploader = uploaderPool.take();
                    logger.debug("Took an adaptor!");
                    logger.debug("Adaptors after taking: " + uploaderPool.size());
                    uploader.addNewFile(file);
                } finally {
                    if (uploader != null) {
                        logger.debug("Adding one back!");
                        uploaderPool.put(uploader);
                        logger.debug("Adaptors after putting: " + uploaderPool.size());
                    }
                }
            } catch (InterruptedException | IOException e) {
                throw new UploadException(e);
            }
});
Run Code Online (Sandbox Code Playgroud)

uploaderPool 是一个 ArrayBlockingQueue。日志:

[ForkJoinPool.commonPool-worker-8] - Adaptors before taking: …
Run Code Online (Sandbox Code Playgroud)

java multithreading forkjoinpool

2
推荐指数
1
解决办法
774
查看次数

如果我在并行流中使用lambda会发生死锁但是如果我使用匿名类而不会发生这种情况?

以下代码导致死锁(在我的电脑上):

public class Test {
    static {
        final int SUM = IntStream.range(0, 100)
                .parallel()
                .reduce((n, m) -> n + m)
                .getAsInt();
    }

    public static void main(String[] args) {
        System.out.println("Finished");
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,如果我reduce用匿名类替换lambda参数,它不会导致死锁:

public class Test {
    static {
        final int SUM = IntStream.range(0, 100)
                .parallel()
                .reduce(new IntBinaryOperator() {
                    @Override
                    public int applyAsInt(int n, int m) {
                        return n + m;
                    }
                })
                .getAsInt();
    }

    public static void main(String[] args) {
        System.out.println("Finished");
    }
}
Run Code Online (Sandbox Code Playgroud)

你能解释一下这种情况吗?

PS

我发现代码(与以前有点不同):

public class …
Run Code Online (Sandbox Code Playgroud)

java deadlock initialization forkjoinpool java-stream

2
推荐指数
1
解决办法
232
查看次数

如何在 Spring Boot @Async 中使用 ForkJoinPool?

我想在我的 Spring Boot 项目中使用 ForkJoinPool 和 @Async 注释,比如 ThreadPoolTask​​Executor

例如 :-

 @Bean("threadPoolTaskExecutor")
    public TaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(1000);
        executor.setThreadNamePrefix("Async-");
        return executor;
    }
Run Code Online (Sandbox Code Playgroud)

我在我的代码中使用https://dzone.com/articles/spring-boot-creating-asynchronous-methods-using-作为这个链接,但我想像这样使用 ForkJoinPool 。

java spring java.util.concurrent forkjoinpool spring-boot

2
推荐指数
1
解决办法
7302
查看次数