AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Run Code Online (Sandbox Code Playgroud)
当我写这篇文章时,我假设线程将只产生 map 调用,因为 parallel 放在 map 之后。但是文件中的某些行在每次执行时都获得了不同的记录号。
我阅读了官方Java 流文档和一些网站,以了解流在幕后是如何工作的。
几个问题:
Java 并行流基于SplitIterator 工作,它由每个集合(如 ArrayList、LinkedList 等)实现。当我们从这些集合中构造并行流时,将使用相应的拆分迭代器来拆分和迭代集合。这解释了为什么并行发生在原始输入源(文件行)级别而不是映射的结果(即记录 pojo)。我的理解正确吗?
就我而言,输入是文件 IO 流。将使用哪个拆分迭代器?
我们parallel()在管道中的位置并不重要。原始输入源将始终被拆分,并将应用剩余的中间操作。
在这种情况下,Java 不应该允许用户在管道中的任何地方放置并行操作,除了原始源。因为,对于那些不知道 java 流内部如何工作的人来说,这是错误的理解。我知道parallel()已经为 Stream 对象类型定义了操作,因此它以这种方式工作。但是,最好提供一些替代解决方案。
在上面的代码片段中,我试图为输入文件中的每条记录添加一个行号,因此应该对其进行排序。但是,我想doSomeOperation()并行应用,因为它是重量级逻辑。实现的一种方法是编写我自己的自定义拆分迭代器。有没有其他办法?
这解释了为什么并行发生在原始输入源(文件行)级别而不是映射的结果(即记录 pojo)。
整个流要么是并行的,要么是顺序的。我们不选择顺序或并行运行的操作子集。
当启动终端操作时,流管道将根据调用它的流的方向顺序或并行执行。[...] 当启动终端操作时,流管道将根据调用它的流的模式顺序或并行执行。同一来源
正如您所提到的,并行流使用拆分迭代器。显然,这是在操作开始运行之前对数据进行分区。
就我而言,输入是文件 IO 流。将使用哪个拆分迭代器?
查看源代码,我看到它使用 java.nio.file.FileChannelLinesSpliterator
我们将 parallel() 放在管道中的什么位置并不重要。原始输入源将始终被拆分,并将应用剩余的中间操作。
对。您甚至可以多次拨打parallel()和sequential()。最后调用的将获胜。当我们调用 时parallel(),我们为返回的流设置它;并且如上所述,所有操作要么按顺序运行,要么并行运行。
在这种情况下,Java 不应该允许用户在管道中的任何地方放置并行操作,除了原始源...
这变成了一个意见问题。我认为 Zabuza 提供了一个很好的理由来支持 JDK 设计者的选择。
实现的一种方法是编写我自己的自定义拆分迭代器。有没有其他办法?
这取决于您的操作
findFirst()是你真正的终端操作,那么你甚至不需要担心并行执行,因为doSomething()反正不会有很多调用(findFirst()是短路)。.parallel()实际上可能会导致处理多个元素,而findFirst()在顺序流上会阻止这种情况。如果您的终端操作不会创建太多数据,那么您可以Record使用顺序流创建对象,然后并行处理结果:
List<Record> smallData = Files.lines(inputFile.toPath(),
StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.collect(Collectors.toList())
.parallelStream()
.filter(record -> doSomeOperation())
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)如果您的管道会在内存中加载大量数据(这可能是您使用 的原因Files.lines()),那么您可能需要一个自定义拆分迭代器。不过,在我去那里之前,我会研究其他选项(例如使用 id 列保存行 - 这只是我的意见)。
我还尝试小批量处理记录,如下所示:
AtomicInteger recordNumber = new AtomicInteger();
final int batchSize = 10;
try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(),
StandardCharsets.UTF_8);) {
Supplier<List<Record>> batchSupplier = () -> {
List<Record> batch = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
String nextLine;
try {
nextLine = reader.readLine();
} catch (IOException e) {
//hanlde exception
throw new RuntimeException(e);
}
if(null == nextLine)
return batch;
batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
}
System.out.println("next batch");
return batch;
};
Stream.generate(batchSupplier)
.takeWhile(list -> list.size() >= batchSize)
.map(list -> list.parallelStream()
.filter(record -> doSomeOperation())
.collect(Collectors.toList()))
.flatMap(List::stream)
.forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
这会doSomeOperation()并行执行,无需将所有数据加载到内存中。但请注意,batchSize这需要考虑一下。
最初的 Stream 设计包括支持具有不同并行执行设置的后续流水线阶段的想法,但该想法已被放弃。API 可能源于这个时间,但另一方面,强制调用者为并行或顺序执行做出单一明确决定的 API 设计会复杂得多。
实际Spliterator使用Files.lines(…)是依赖于实现的。在 Java 8(Oracle 或 OpenJDK)中,你总是得到与BufferedReader.lines(). 在较新的 JDK 中,如果Path属于默认文件系统并且字符集是此功能支持的字符集之一,则您将获得具有专用Spliterator实现的 Stream ,java.nio.file.FileChannelLinesSpliterator. 如果不满足先决条件,您将获得与 with 相同的结果BufferedReader.lines(),它仍然基于Iterator实现 insideBufferedReader并包装 via Spliterators.spliteratorUnknownSize。
您的特定任务最好使用自定义Spliterator来处理,该自定义可以在并行处理之前在源处执行行编号,以允许后续并行处理不受限制。
public static Stream<Record> records(Path p) throws IOException {
LineNoSpliterator sp = new LineNoSpliterator(p);
return StreamSupport.stream(sp, false).onClose(sp);
}
private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
int chunkSize = 100;
SeekableByteChannel channel;
LineNumberReader reader;
LineNoSpliterator(Path path) throws IOException {
channel = Files.newByteChannel(path, StandardOpenOption.READ);
reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
}
@Override
public void run() {
try(Closeable c1 = reader; Closeable c2 = channel) {}
catch(IOException ex) { throw new UncheckedIOException(ex); }
finally { reader = null; channel = null; }
}
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
try {
String line = reader.readLine();
if(line == null) return false;
action.accept(new Record(reader.getLineNumber(), line));
return true;
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override
public Spliterator<Record> trySplit() {
Record[] chunks = new Record[chunkSize];
int read;
for(read = 0; read < chunks.length; read++) {
int pos = read;
if(!tryAdvance(r -> chunks[pos] = r)) break;
}
return Spliterators.spliterator(chunks, 0, read, characteristics());
}
@Override
public long estimateSize() {
try {
return (channel.size() - channel.position()) / 60;
} catch (IOException ex) {
return 0;
}
}
@Override
public int characteristics() {
return ORDERED | NONNULL | DISTINCT;
}
}
Run Code Online (Sandbox Code Playgroud)