Java 8中的默认"paralellStream()"使用公共ForkJoinPool
,如果在提交任务时公共池线程用尽,则可能是延迟问题.但是,在许多情况下,有足够的CPU功率可用且任务足够短,因此这不是问题.如果我们确实有一些长期运行的任务,这当然需要仔细考虑,但对于这个问题,让我们假设这不是问题.
但是ForkJoinPool
,即使有足够的CPU功率,填充实际上不执行任何CPU绑定工作的I/O任务也是一种引入瓶颈的方法.我明白了.然而,这就是我们所拥有的ManagedBlocker
.因此,如果我们有一个I/O任务,我们应该只允许ForkJoinPool
在一个内部管理它ManagedBlocker
.听起来非常简单.但令我惊讶的ManagedBlocker
是,使用一个相当复杂的API来实现简单的事情.毕竟我认为这是一个常见的问题.所以我只是构建了一个简单的实用方法,使其ManagedBlocker
易于用于常见情况:
public class BlockingTasks {
public static<T> T callInManagedBlock(final Supplier<T> supplier) {
final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
try {
ForkJoinPool.managedBlock(managedBlock);
} catch (InterruptedException e) {
throw new Error(e);
}
return managedBlock.getResult();
}
private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
private final Supplier<T> supplier;
private T result;
private boolean done = false;
private SupplierManagedBlock(final Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public boolean block() {
result = supplier.get();
done = true;
return true;
}
@Override
public boolean isReleasable() {
return done;
}
public T getResult() {
return result;
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在,如果我想在paralell下载几个网站的html代码,我可以像这样没有I/O造成任何麻烦:
public static void main(String[] args) {
final List<String> pagesHtml = Stream
.of("https://google.com", "https://stackoverflow.com", "...")
.map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
.collect(Collectors.toList());
}
Run Code Online (Sandbox Code Playgroud)
我有点惊讶的是,BlockingTasks
上面没有像Java 一样附带的类(或者我没有找到它?),但是构建起来并不难.
当我谷歌搜索"java 8并行流"时,我在前四个结果中得到那些声称由于I/O问题Fork/Join糟透了Java的文章:
ManagedBlocker
但也说"在不同的用例中你可以给它一个ManagedBlocker实例"在这种情况下,它没有提到为什么不这样做.我在某种程度上改变了我的搜索条件,虽然有很多人在抱怨生活多么可怕,但我发现没有人会像上面那样谈论解决方案.由于我不喜欢Marvin(像行星一样的大脑)和Java 8可用了很长一段时间,我怀疑我在那里建议的东西有些严重错误.
我拼凑了一个小测试:
public static void main(String[] args) {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}
public static void sleep() {
try {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new Error(e);
}
}
Run Code Online (Sandbox Code Playgroud)
我跑了,得到了以下结果:
18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End
Run Code Online (Sandbox Code Playgroud)
因此,在我的8 CPU计算机上,ForkJoinPool
自然选择8个线程,完成前8个任务,最后完成最后两个任务,这意味着这需要20秒,如果有其他任务排队,则池仍然可能没有使用明显空闲的CPU(除了最后10秒内有6个核心).
然后我用...
IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));
Run Code Online (Sandbox Code Playgroud)
...代替...
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
Run Code Online (Sandbox Code Playgroud)
......并得到以下结果:
18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End
Run Code Online (Sandbox Code Playgroud)
它看起来像这样工作,额外的线程开始补偿我的模拟"阻止I/O操作"(睡眠).时间减少到10秒,我想如果我排队更多的任务,那些仍然可以使用可用的CPU功率.
如果I/O操作包含在一个ManagedBlock
?中,这个解决方案是否有任何问题,或者通常在流中使用I/O ?
Tag*_*eev 12
简而言之,是的,您的解决方案存在一些问题.它肯定改进了在并行流中使用阻塞代码,并且一些第三方库提供了类似的解决方案(例如,参见Blocking
jOOλ库中的类).但是,此解决方案不会更改Stream API中使用的内部拆分策略.Stream API创建的子任务数由AbstractTask
类中的预定义常量控制:
/**
* Default target factor of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,它比普通池并行性(默认为CPU核心数)大四倍.真正的分裂算法有点棘手,但即使所有这些任务都是阻塞的,你也不可能拥有超过4x-8x的任务.
例如,如果您有8个CPU内核,那么您的Thread.sleep()
测试可以很好地工作IntStream.range(0, 32)
(如32 = 8*4).但是,对于IntStream.range(0, 64)
您将有32个并行任务,每个处理两个输入数字,因此整个处理将花费20秒,而不是10秒.
归档时间: |
|
查看次数: |
1588 次 |
最近记录: |