Cer*_*ber 2 java parallel-processing java-8 java-stream
我偶然发现了一个相当烦人的问题:我有一个包含大量数据源的程序,它能够传输相同类型的元素,并且我想"映射"程序中的每个可用元素(元素顺序不物).
因此我试图将我Stream<Stream<T>> streamOfStreamOfT;
简化为简单的Stream<T> streamOfT;
使用streamOfT = streamOfStreamOfT.reduce(Stream.empty(), Stream::concat);
由于元素顺序对我来说并不重要,我尝试将reduce操作并行化为.parallel()
:streamOfT = streamOfStreamOfT.parallel().reduce(Stream.empty(), Stream::concat);
但是这会触发一个java.lang.IllegalStateException: stream has already been operated upon or closed
要亲自体验它,只需通过注释/取消注释即可使用以下主要内容(java 1.8u20) .parallel()
public static void main(String[] args) {
// GIVEN
List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
for (int j = 0; j < 10; j++) {
IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
.limit(10);
Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
intStreamOf10Ints.spliterator(), true);
listOfStreamOfInts.add(genericStreamOf10Ints);
}
Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
.stream();
// WHEN
Stream<Integer> streamOfInts = streamOfStreamOfInts
// ////////////////
// PROBLEM
// |
// V
.parallel()
.reduce(Stream.empty(), Stream::concat);
// THEN
System.out.println(streamOfInts.map(String::valueOf).collect(
joining(", ")));
}
Run Code Online (Sandbox Code Playgroud)
有人可以解释这个限制吗?/找到一种更好的方法来处理流的并行减少
继@Smutje和@LouisWasserman评论之后,似乎这.flatMap(Function.identity())
是一个容忍.parallel()
流的更好的选择
reduce
您正在使用的形式采用身份值和关联组合功能.但这Stream.empty()
不是一个价值; 它有州.流不是数组或集合之类的数据结构; 它们是通过可能并行的聚合操作来推送数据的载体,它们具有一些状态(比如流是否被消耗.)想想它是如何工作的; 你要构建一棵树,其中同一个"空"流出现在多个叶子中.当你尝试两次使用这个有状态的非同一性(它不会顺序发生,但会并行发生)时,第二次尝试遍历那个空流时,它将被正确地看作已被使用.
所以问题是,你只是reduce
错误地使用这种方法.问题不在于并行性; 简单来说,并行性暴露了潜在的问题.
其次,即使按照你认为应该的方式"工作",你也只能建立代表扁平流的树的并行性; 当你去加入时,那是一个连续的流管道.糟糕!
第三,即使你按照你认为的方式"工作",你也会通过建立连接流来增加很多元素访问开销,而你却不会从你正在寻求的并行性中获益.
简单的答案是平整流:
String joined = streamOfStreams.parallel()
.flatMap(s -> s)
.collect(joining(", "));
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1963 次 |
最近记录: |