kli*_*ron 19 java parallel-processing nio java-8 java-stream
以下代码片段是获取目录列表的方法的一部分,在每个文件上调用extract方法并将生成的药物对象序列化为xml.
try(Stream<Path> paths = Files.list(infoDir)) {
paths
.parallel()
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
}
Run Code Online (Sandbox Code Playgroud)
这是完全相同的完全相同的代码,但使用普通.list()调用来获取目录列表并调用.parallelStream()结果列表.
Arrays.asList(infoDir.toFile().list())
.parallelStream()
.map(f -> infoDir.resolve(f))
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
Run Code Online (Sandbox Code Playgroud)
我的机器是四核MacBook Pro,Java v 1.8.0_60(内置1.8.0_60-b27).
我正在处理~7000个文件.平均3次运行:
第一版:有.parallel():20秒.没有.parallel():41秒
第二版:带.parallelStream():12秒.用.stream():41秒.
并行模式下的那8秒似乎是一个巨大的差异,因为extract从流中读取并完成所有繁重工作的方法和write执行最终写入的调用都没有改变.
Tag*_*eev 24
问题是Stream API的当前实现以及IteratorSpliterator针对未知大小源的当前实现严重地将这些源拆分为并行任务.你很幸运拥有超过1024个文件,否则你根本就没有并行化的好处.当前流API实现考虑了estimateSize()从中返回的值Spliterator.在IteratorSpliterator未知大小的回报Long.MAX_VALUE之前拆分和后缀总是返回Long.MAX_VALUE为好.它的分裂策略如下:
MAX_BATCH达到大小(即33554432个元素).ArraySpliterator对创建的数组的迭代作为前缀,将其自身保留为后缀.假设你有7000个文件.Stream API会询问估计的大小,IteratorSpliterator返回值Long.MAX_VALUE.好的,Stream API要求IteratorSpliterator拆分,它从底层收集1024个元素DirectoryStream到数组并分成ArraySpliterator(估计大小为1024)和它自己(估计大小仍然是Long.MAX_VALUE).由于Long.MAX_VALUE远远超过1024,Stream API决定继续拆分较大的部分,甚至不试图拆分较小的部分.所以整体拆分树是这样的:
IteratorSpliterator (est. MAX_VALUE elements)
| |
ArraySpliterator (est. 1024 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 2048 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 3072 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 856 elements) IteratorSpliterator (est. MAX_VALUE elements)
|
(split returns null: refuses to split anymore)
Run Code Online (Sandbox Code Playgroud)
所以在那之后你要执行五个并行任务:实际上包含1024,2048,3072,856和0个元素.请注意,即使最后一个块有0个元素,它仍然会报告它有估计Long.MAX_VALUE元素,因此Stream API也会将其发送给它ForkJoinPool.糟糕的是,Stream API认为前四个任务的进一步分割是无用的,因为它们的估计大小要小得多.所以你得到的是非常不均匀的输入分割,最大利用四个CPU内核(即使你有更多).如果每个元素的处理对于任何元素大致相同,那么整个过程将等待最大部分(3072个元素)完成.所以最大加速可能是7000/3072 = 2.28x.因此,如果顺序处理需要41秒,那么并行流将需要大约41/2.28 = 18秒(这接近您的实际数字).
您的解决方案完全没问题.请注意,使用Files.list().parallel()您还将所有输入Path元素存储在内存中(在ArraySpliterator对象中).因此,如果手动将它们转储到内存中,则不会浪费更多内存List.像数组支持的列表实现ArrayList(当前由其创建Collectors.toList())可以均匀地分割而没有任何问题,这导致额外的加速.
为什么这种情况没有优化?当然,这不是一个不可能的问题(虽然实施可能非常棘手).对于JDK开发人员来说,这似乎不是高优先级问题.在邮件列表中有关于此主题的几个讨论.你可以在这里阅读Paul Sandoz的消息,他在那里评论我的优化工作.
作为替代方案,您可以使用专为DirectoryStream以下方面量身定制的自定义分离器:
public class DirectorySpliterator implements Spliterator<Path> {
Iterator<Path> iterator;
long est;
private DirectorySpliterator(Iterator<Path> iterator, long est) {
this.iterator = iterator;
this.est = est;
}
@Override
public boolean tryAdvance(Consumer<? super Path> action) {
if (iterator == null) {
return false;
}
Path path;
try {
synchronized (iterator) {
if (!iterator.hasNext()) {
iterator = null;
return false;
}
path = iterator.next();
}
} catch (DirectoryIteratorException e) {
throw new UncheckedIOException(e.getCause());
}
action.accept(path);
return true;
}
@Override
public Spliterator<Path> trySplit() {
if (iterator == null || est == 1)
return null;
long e = this.est >>> 1;
this.est -= e;
return new DirectorySpliterator(iterator, e);
}
@Override
public long estimateSize() {
return est;
}
@Override
public int characteristics() {
return DISTINCT | NONNULL;
}
public static Stream<Path> list(Path parent) throws IOException {
DirectoryStream<Path> ds = Files.newDirectoryStream(parent);
int splitSize = Runtime.getRuntime().availableProcessors() * 8;
DirectorySpliterator spltr = new DirectorySpliterator(ds.iterator(), splitSize);
return StreamSupport.stream(spltr, false).onClose(() -> {
try {
ds.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
只需更换Files.list,DirectorySpliterator.list它将平行并行,无需任何中间缓冲.这里我们使用DirectoryStream生成没有任何特定顺序的目录列表的事实,因此每个并行线程将只从它获取一个后续条目(以同步方式,因为我们已经有同步IO操作,额外的同步具有几乎没有任何开销) .并行顺序每次都会有所不同(即使forEachOrdered使用),但Files.list()也不保证顺序.
这里唯一不重要的部分是要创建多少并行任务.由于我们在遍历它之前不知道文件夹中有多少文件,因此最好将其availableProcessors()用作基础.我创建了关于8 x availableProcessors()单个任务,这似乎是一个很好的细粒度/粗粒度的折衷:如果每个元素处理不均匀,比处理器有更多的任务将有助于平衡负载.
| 归档时间: |
|
| 查看次数: |
2284 次 |
| 最近记录: |