我刚刚在取消ForkJoinPool返回的Future时注意到以下现象.给出以下示例代码:
ForkJoinPool pool = new ForkJoinPool();
Future<?> fut = pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
while (true) {
if (Thread.currentThread().isInterrupted()) { // <-- never true
System.out.println("interrupted");
throw new InterruptedException();
}
}
}
});
Thread.sleep(1000);
System.out.println("cancel");
fut.cancel(true);
Run Code Online (Sandbox Code Playgroud)
该程序永远不会打印interrupted
.ForkJoinTask#cancel(boolean)的文档说:
mayInterruptIfRunning - 此值在默认实现中无效,因为中断不用于控制取消.
如果ForkJoinTasks忽略了中断,你应该如何检查提交给ForkJoinPool的Callables中的取消?
我正在阅读Java ForkJoin框架.通过不直接调用(例如)invoke()
的实现,但实例化和调用有什么额外的好处?当我们称这两种方法全部被调用时到底发生了什么?ForkJoinTask
RecursiveTask
ForkJoinPool
pool.invoke(task)
invoke
从源代码来看,似乎如果recursiveTask.invoke
被调用,它将以托管线程池的方式调用它exec
并最终调用它compute
.因此,为什么我们有这个成语更令人困惑pool.invoke(task)
.
我写了一些简单的代码来测试性能差异,但我没有看到任何.也许测试代码错了?见下文:
public class MyForkJoinTask extends RecursiveAction {
private static int totalWorkInMillis = 20000;
protected static int sThreshold = 1000;
private int workInMillis;
public MyForkJoinTask(int work) {
this.workInMillis = work;
}
// Average pixels from source, write results into destination.
protected void computeDirectly() {
try {
ForkJoinTask<Object> objectForkJoinTask = new ForkJoinTask<>();
Thread.sleep(workInMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void …
Run Code Online (Sandbox Code Playgroud) java concurrency java.util.concurrent fork-join forkjoinpool
我一直在学习Java 8的并行流,这篇文章出现在Google搜索的第一页上.我无法理解文章中的这一特定内容
...所有并行流都使用公共fork-join线程池,如果您提交长时间运行的任务,则可以有效地阻止池中的所有线程.
我无法弄清楚长时间运行的任务如何阻止池中的所有其他线程.
我需要在其队列已满时阻止ForkJoinPool上的线程.这可以在标准的ThreadPoolExecutor中完成,例如:
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
Run Code Online (Sandbox Code Playgroud)
我知道,ForkJoinPool中有一些Dequeue,但是我无法通过它访问它.
更新:请参阅下面的答案.
在阅读了有关ForkJoinPool之后,我尝试了一个实验来测试ForkJoinPool
与普通递归相比实际有多快.
我递归地计算了一个文件夹中的文件数,而且对我来说,普通的递归方式表现得更好 ForkJoinPool
这是我的代码.
递归任务
class DirectoryTask extends RecursiveTask<Long> {
private Directory directory;
@Override
protected Long compute() {
List<RecursiveTask<Long>> forks = new ArrayList<>();
List<Directory> directories = directory.getDirectories();
for (Directory directory : directories) {
DirectoryTask directoryTask = new DirectoryTask(directory);
forks.add(directoryTask);
directoryTask.fork();
}
Long count = directory.getDoumentCount();
for (RecursiveTask<Long> task : forks) {
count += task.join();
}
return count;
}
}
Run Code Online (Sandbox Code Playgroud)
普通递归
private static Long getFileCount(Directory directory) {
Long recursiveCount = 0L;
List<Directory> directories = directory.getDirectories();
if (null …
Run Code Online (Sandbox Code Playgroud) 我有下一个实现RecursiveAction
,这个类的单一目的 - 是从 0 到 9 打印,但如果可能的话,从不同的线程打印:
public class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
}
}
Run Code Online (Sandbox Code Playgroud)
我认为调用awaitQuiescence
将使当前线程等待所有任务(提交和分叉)完成:
public class Main {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
}
}
Run Code Online (Sandbox Code Playgroud)
但我并不总是得到正确的结果,而不是打印 10 次,打印 0 到 10 次。 …
相关:ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢?
我正在研究通过 parallelStream 和 CompletableFutures 并行化网络调用的不同方法。因此,我遇到过这种情况,Java 的 parallelStream 使用的 ForkJoinPool.commonPool() 的大小动态增长,从 ~ #Cores 到最大值 64。
爪哇细节:
$ java -version
openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
Run Code Online (Sandbox Code Playgroud)
显示这种行为的代码如下(完整的可执行代码在这里)
openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
Run Code Online (Sandbox Code Playgroud)
示例输出:
16:07:17.443 [pool-2-thread-7] INFO generalworks.parallelism.DummyProcess - 44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO generalworks.parallelism.DummyProcess …
Run Code Online (Sandbox Code Playgroud) java parallel-processing multithreading forkjoinpool java-stream
我正在使用parallelStream 并行上传一些文件,有些是大文件,有些是小文件。我注意到并非所有工人都被使用。
一开始一切都运行良好,所有线程都被使用(我将并行度选项设置为 16)。然后在某一时刻(一旦到达更大的文件),它只使用一个线程
简化代码:
files.parallelStream().forEach((file) -> {
try (FileInputStream fileInputStream = new FileInputStream(file)) {
IDocumentStorageAdaptor uploader = null;
try {
logger.debug("Adaptors before taking: " + uploaderPool.size());
uploader = uploaderPool.take();
logger.debug("Took an adaptor!");
logger.debug("Adaptors after taking: " + uploaderPool.size());
uploader.addNewFile(file);
} finally {
if (uploader != null) {
logger.debug("Adding one back!");
uploaderPool.put(uploader);
logger.debug("Adaptors after putting: " + uploaderPool.size());
}
}
} catch (InterruptedException | IOException e) {
throw new UploadException(e);
}
});
Run Code Online (Sandbox Code Playgroud)
uploaderPool 是一个 ArrayBlockingQueue。日志:
[ForkJoinPool.commonPool-worker-8] - Adaptors before taking: …
Run Code Online (Sandbox Code Playgroud) 以下代码导致死锁(在我的电脑上):
public class Test {
static {
final int SUM = IntStream.range(0, 100)
.parallel()
.reduce((n, m) -> n + m)
.getAsInt();
}
public static void main(String[] args) {
System.out.println("Finished");
}
}
Run Code Online (Sandbox Code Playgroud)
但是,如果我reduce
用匿名类替换lambda参数,它不会导致死锁:
public class Test {
static {
final int SUM = IntStream.range(0, 100)
.parallel()
.reduce(new IntBinaryOperator() {
@Override
public int applyAsInt(int n, int m) {
return n + m;
}
})
.getAsInt();
}
public static void main(String[] args) {
System.out.println("Finished");
}
}
Run Code Online (Sandbox Code Playgroud)
你能解释一下这种情况吗?
我发现代码(与以前有点不同):
public class …
Run Code Online (Sandbox Code Playgroud) 我想在我的 Spring Boot 项目中使用 ForkJoinPool 和 @Async 注释,比如 ThreadPoolTaskExecutor
例如 :-
@Bean("threadPoolTaskExecutor")
public TaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(1000);
executor.setThreadNamePrefix("Async-");
return executor;
}
Run Code Online (Sandbox Code Playgroud)
我在我的代码中使用https://dzone.com/articles/spring-boot-creating-asynchronous-methods-using-作为这个链接,但我想像这样使用 ForkJoinPool 。
forkjoinpool ×10
java ×9
java-stream ×3
concurrency ×2
fork-join ×2
java-8 ×2
deadlock ×1
recursion ×1
spring ×1
spring-boot ×1