我们可以向 提交两种类型的任务forkJoinPool。一个是RecursiveAction,另一个是RecursiveTask。
它们之间有什么区别?
我使用 jaxb 进行解组,但是当我使用 ForkJoinPoolexecute() 方法时,我得到一个“java.log.ClassNotFoundException:com.sun.xml.internal.bind.v2.ContextFactory”,但我确信存在在我的运行时的类路径中,因为当我不使用 ForkJoinPool 时它可以正常工作......你知道这个问题或解决方法吗?
我使用Java 11
我的代码:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
commonPool.execute(() -> {
try {
String messageFileContent = Files.readString(Paths.get(
this.getClass().getClassLoader().getResource("xml-to-process.xml").getPath()));
JAXBContext jaxbContext = JAXBContext.newInstance(ObjectFactory.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
// avoiding schema validation for more performance
jaxbUnmarshaller.setSchema(null);
UpdateWorkOrder updateWorkOrder = (UpdateWorkOrder) jaxbUnmarshaller.unmarshal(new StringReader(messageFileContent));
} catch (Exception e) {
e.printStackTrace();
}
});
Run Code Online (Sandbox Code Playgroud)
这很奇怪不是...?在ForkJoinPool的execute()之外,解析是正确执行的。
错误是:
javax.xml.bind.JAXBException: Implementation of JAXB-API has not been found on module path or classpath.
with linked exception:
[java.lang.ClassNotFoundException: com.sun.xml.internal.bind.v2.ContextFactory]
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at …Run Code Online (Sandbox Code Playgroud) 我使用 Java 16 通过 HTTP 向 API 发出请求。为了整体加快速度,我将其加载到自定义ForkJoinPool. 我在下面编译了一个重现示例。
自从迁移到 Java 17(openjdk build 17.0.1+12-39)后,这会抛出 RejectedExecutionException:
Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at Test.lambda$retrieveMany$1(Test.java:30)
Run Code Online (Sandbox Code Playgroud)
为什么会出现这种情况?ForkJoinPool 是否发生了我不知道的变化?
代码
Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at Test.lambda$retrieveMany$1(Test.java:30)
Run Code Online (Sandbox Code Playgroud) 是否可以配置ForkJoinPool为使用1个执行线程?
我正在执行Random在一个内部调用的代码ForkJoinPool.每次运行时,我都会遇到不同的运行时行为,因此很难调查回归.
我希望代码库提供"调试"和"发布"模式."debug"模式将Random使用固定种子配置,并ForkJoinPool使用单个执行线程."release"模式将使用系统提供的Random种子并使用默认的ForkJoinPool线程数.
我尝试ForkJoinPool使用1的并行性配置,但它使用2个线程(main和第二个工作线程).有任何想法吗?
根据Oracle,
static CompletableFuture supplyAsync(供应商供应商) 返回一个新的CompletableFuture,它由在ForkJoinPool.commonPool()中运行的任务异步完成,其中包含通过调用给定供应商获得的值.
static CompletableFuture supplyAsync(供应商供应商,执行程序执行程序)返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
如果我使用"静态CompletableFuture supplyAsync(供应商供应商)"方法,它默认使用ForkJoinPool.commonPool().这将返回一个ForkJoinPool,其工作线程数等于正在运行的计算机中的可用核心数.
但是,我想使用ForkJoinPool和我自定义的工作线程数.使用ForkJoinPool.commonPool()我不能这样做.
那么如何使用我所声明的ForkJoinPool 使用CompletableFuture.supplyAsync方法使用我想要的工作线程数?
假设我有一个具有并行度n的ForkJoinPool设置,并且我调用这样的并行计算:
workpool.submit(
() -> {
objects.values().parallelStream().forEach(obj -> {
obj.foo();
});
});
Run Code Online (Sandbox Code Playgroud)
我这样做是为了确保在那里生成的线程在工作池中创建(我有需要隔离的系统的不同组件).现在假设调用它的线程也在这个工作池中执行,我这样做:
Future<?> wait = workpool.submit(
() -> {
objects.values().parallelStream().forEach(obj -> {
obj.foo();
});
});
wait.get()
Run Code Online (Sandbox Code Playgroud)
1)我是否阻止了ForkJoinPool中的线程?如果我在期货中有n个线程全部阻止,而在尝试在工作池中安排任务时,这是否会导致死锁?我不清楚ForkJoinPool中的"最大并行度"是否意味着(如果有n个非阻塞任务),总会有n个线程在执行,或者是否有固定数量的线程,无论是否有阻止.如果我使用wait.join()代替wait.join(我不需要检查异常,因为此代码中抛出的任何异常都会生成runtimeexception.如果我理解正确,join()将允许线程在等待时执行排队的任务)
2)如果我通过执行() - > {}创建一个可运行的"包装器"类,我仍然可以获得并行流的轻量级forkjoin任务的好处
3)使用它是否有任何缺点/好处(假设.join()确实实现了我认为它的工作窃取行为):
CompletableFuture.supplyAsync(this::mylambdafunction, workpool)
.thenAccept(this::mynextfunction);
Run Code Online (Sandbox Code Playgroud) 在tomcat从8.5.6升级到8.5.28后,并行流停止为contexts提供contextClassLoader:
因为它Warmer::run无法在其中加载类.
warmers.parallelStream().forEach(Warmer::run);
Run Code Online (Sandbox Code Playgroud)
你有什么想法Tomcat为新线程提供contextClassLoaders的东西吗?
ParallelStream在最新的Tomcat中使用ForkJoinPool.
我想在Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.
计划:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}
@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " …Run Code Online (Sandbox Code Playgroud) 免责声明:这是我第一次使用Java的Fork-Join框架,所以我并不是100%确定我正确使用它.Java也不是我的主要编程语言,所以这也可能是相关的.
鉴于以下SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch …Run Code Online (Sandbox Code Playgroud) 我正在阅读线程并了解 fork/join API。
我发现您可以使用 commonPool 作为管理线程的默认池运行线程,或者我可以将线程提交到新创建的 ForkJoinPool。
两者之间的区别如下,据我所知:
parallelism- 我忽略了完全限定的系统属性键名-)。根据文档,commonPool 适用于大多数用途。
这一切都归结为我的问题:
什么时候应该使用公共池?为什么会这样?我应该什么时候创建一个新池?为什么会这样?
forkjoinpool ×10
java ×10
fork-join ×3
java-8 ×3
asynchronous ×1
java-17 ×1
java-stream ×1
jaxb ×1
openjdk-17 ×1
tomcat8 ×1