标签: forkjoinpool

ForkJoinPool 中递归任务和递归动作的区别

我们可以向 提交两种类型的任务forkJoinPool。一个是RecursiveAction,另一个是RecursiveTask

它们之间有什么区别?

java multithreading forkjoinpool

7
推荐指数
1
解决办法
3155
查看次数

使用 ForkJoinPool Java 找不到 JAXB 类

我使用 jaxb 进行解组,但是当我使用 ForkJoinPoolexecute() 方法时,我得到一个“java.log.ClassNotFoundException:com.sun.xml.internal.bind.v2.ContextFactory”,但我确信存在在我的运行时的类路径中,因为当我不使用 ForkJoinPool 时它可以正常工作......你知道这个问题或解决方法吗?

我使用Java 11

我的代码:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
        commonPool.execute(() -> {
            try {
                String messageFileContent = Files.readString(Paths.get(
                        this.getClass().getClassLoader().getResource("xml-to-process.xml").getPath()));

                JAXBContext jaxbContext = JAXBContext.newInstance(ObjectFactory.class);
                Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
                // avoiding schema validation for more performance
                jaxbUnmarshaller.setSchema(null);
                UpdateWorkOrder updateWorkOrder = (UpdateWorkOrder) jaxbUnmarshaller.unmarshal(new StringReader(messageFileContent));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
Run Code Online (Sandbox Code Playgroud)

这很奇怪不是...?在ForkJoinPool的execute()之外,解析是正确执行的。

错误是:

javax.xml.bind.JAXBException: Implementation of JAXB-API has not been found on module path or classpath.
with linked exception:
[java.lang.ClassNotFoundException: com.sun.xml.internal.bind.v2.ContextFactory]
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at …
Run Code Online (Sandbox Code Playgroud)

java jaxb forkjoinpool

7
推荐指数
2
解决办法
4030
查看次数

为什么 Java 17 在将任务添加到 ForkJoinPool 时会抛出 RejectedExecutionException?

我使用 Java 16 通过 HTTP 向 API 发出请求。为了整体加快速度,我将其加载到自定义ForkJoinPool. 我在下面编译了一个重现示例。

自从迁移到 Java 17(openjdk build 17.0.1+12-39)后,这会抛出 RejectedExecutionException:

Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
    at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
    at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
    at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
    at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
    at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
    at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
    at Test.lambda$retrieveMany$1(Test.java:30)
Run Code Online (Sandbox Code Playgroud)

为什么会出现这种情况?ForkJoinPool 是否发生了我不知道的变化?

代码

Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
    at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
    at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
    at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
    at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
    at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
    at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
    at Test.lambda$retrieveMany$1(Test.java:30)
Run Code Online (Sandbox Code Playgroud)

java forkjoinpool java-17 openjdk-17

7
推荐指数
1
解决办法
2372
查看次数

如何配置单线程ForkJoinPool?

是否可以配置ForkJoinPool为使用1个执行线程?

我正在执行Random在一个内部调用的代码ForkJoinPool.每次运行时,我都会遇到不同的运行时行为,因此很难调查回归.

我希望代码库提供"调试"和"发布"模式."debug"模式将Random使用固定种子配置,并ForkJoinPool使用单个执行线程."release"模式将使用系统提供的Random种子并使用默认的ForkJoinPool线程数.

我尝试ForkJoinPool使用1的并行性配置,但它使用2个线程(main和第二个工作线程).有任何想法吗?

java multithreading fork-join forkjoinpool

6
推荐指数
1
解决办法
1333
查看次数

如何在CompletableFuture.supplyAsync(Supplier <U>供应商)方法中使用所需数量的工作线程设置ForkJoinPool?

根据Oracle,

static CompletableFuture supplyAsync(供应商供应商) 返回一个新的CompletableFuture,它由在ForkJoinPool.commonPool()中运行的任务异步完成,其中包含通过调用给定供应商获得的值.

static CompletableFuture supplyAsync(供应商供应商,执行程序执行程序)返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.

如果我使用"静态CompletableFuture supplyAsync(供应商供应商)"方法,它默认使用ForkJoinPool.commonPool().这将返回一个ForkJoinPool,其工作线程数等于正在运行的计算机中的可用核心数.

但是,我想使用ForkJoinPool和我自定义的工作线程数.使用ForkJoinPool.commonPool()我不能这样做.

那么如何使用我所声明的ForkJoinPool 使用CompletableFuture.supplyAsync方法使用我想要的工作线程数?

java multithreading forkjoinpool completable-future

6
推荐指数
1
解决办法
2617
查看次数

ForkJoinPool和Future.Get

假设我有一个具有并行度n的ForkJoinPool设置,并且我调用这样的并行计算:

 workpool.submit(
            () -> {
                    objects.values().parallelStream().forEach(obj -> {
                        obj.foo();
                    });
                });
Run Code Online (Sandbox Code Playgroud)

我这样做是为了确保在那里生成的线程在工作池中创建(我有需要隔离的系统的不同组件).现在假设调用它的线程也在这个工作池中执行,我这样做:

 Future<?> wait =  workpool.submit(
            () -> {
                    objects.values().parallelStream().forEach(obj -> {
                        obj.foo();
                    });
                });
 wait.get()
Run Code Online (Sandbox Code Playgroud)

1)我是否阻止了ForkJoinPool中的线程?如果我在期货中有n个线程全部阻止,而在尝试在工作池中安排任务时,这是否会导致死锁?我不清楚ForkJoinPool中的"最大并行度"是否意味着(如果有n个非阻塞任务),总会有n个线程在执行,或者是否有固定数量的线程,无论是否有阻止.如果我使用wait.join()代替wait.join(我不需要检查异常,因为此代码中抛出的任何异常都会生成runtimeexception.如果我理解正确,join()将允许线程在等待时执行排队的任务)

2)如果我通过执行() - > {}创建一个可运行的"包装器"类,我仍然可以获得并行流的轻量级forkjoin任务的好处

3)使用它是否有任何缺点/好处(假设.join()确实实现了我认为它的工作窃取行为):

        CompletableFuture.supplyAsync(this::mylambdafunction, workpool)  
             .thenAccept(this::mynextfunction);
Run Code Online (Sandbox Code Playgroud)

java asynchronous java-8 forkjoinpool

6
推荐指数
1
解决办法
770
查看次数

并行流在tomcat升级后不设置Thread.contextClassLoader

在tomcat从8.5.6升级到8.5.28后,并行流停止为contexts提供contextClassLoader:

因为它Warmer::run无法在其中加载类.

warmers.parallelStream().forEach(Warmer::run);
Run Code Online (Sandbox Code Playgroud)

你有什么想法Tomcat为新线程提供contextClassLoaders的东西吗?

ParallelStream在最新的Tomcat中使用ForkJoinPool.

java contextclassloader forkjoinpool tomcat8 java-stream

6
推荐指数
2
解决办法
824
查看次数

ForkJoinPool - 为什么程序抛出OutOfMemoryError?

我想在Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.

计划:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " …
Run Code Online (Sandbox Code Playgroud)

java multithreading fork-join java-8 forkjoinpool

6
推荐指数
1
解决办法
846
查看次数

为什么ForkJoinPool :: invoke()会阻塞主线程?

免责声明:这是我第一次使用Java的Fork-Join框架,所以我并不是100%确定我正确使用它.Java也不是我的主要编程语言,所以这也可能是相关的.


鉴于以下SSCCE:

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

class ForkCalculator extends RecursiveAction
{
    private final Integer[] delayTasks;

    public ForkCalculator(Integer[] delayTasks)
    {
        this.delayTasks = delayTasks;
    }

    @Override
    protected void compute()
    {
        if (this.delayTasks.length == 1) {
            this.computeDirectly();
            return;
        }

        Integer halfway = this.delayTasks.length / 2;

        ForkJoinTask.invokeAll(
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, 0, halfway)
            ),
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
            )
        );
    }

    private void computeDirectly()
    {
        Integer delayTask = this.delayTasks[0];

        try {
            Thread.sleep(delayTask);
        } catch …
Run Code Online (Sandbox Code Playgroud)

java fork-join java-8 forkjoinpool

6
推荐指数
1
解决办法
545
查看次数

何时创建新的 ForkJoinPool 以及何时使用 CommonPool?

我正在阅读线程并了解 fork/join API。

我发现您可以使用 commonPool 作为管理线程的默认池运行线程,或者我可以将线程提交到新创建的 ForkJoinPool。

两者之间的区别如下,据我所知:

  • commonPool 是静态创建的主池(其中一些池方法不像它们通常对其他池那样工作,例如关闭它),并且主要用于运行应用程序。
  • default/commonPool 中的并行数是内核数 - 1,其中新创建的池的默认并行数 = 内核数(或系统属性指定的数量parallelism- 我忽略了完全限定的系统属性键名-)。

根据文档,commonPool 适用于大多数用途。

这一切都归结为我的问题:

什么时候应该使用公共池?为什么会这样?我应该什么时候创建一个新池?为什么会这样?

java multithreading forkjoinpool

6
推荐指数
1
解决办法
1078
查看次数