鉴于我有一个字符串列表List<String> toProcess
.结果必须按原始线的顺序排列.我想利用新的并行流.
以下代码是否保证结果与原始列表中的顺序相同?
// ["a", "b", "c"]
List<String> toProcess;
// should be ["a", "b", "c"]
List<String> results = toProcess.parallelStream()
.map(s -> s)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud) 环境:Ubuntu x86_64(14.10),Oracle JDK 1.8u25
我尝试使用并行流Files.lines()
但我想要.skip()
第一行(它是带有标题的CSV文件).所以我试着这样做:
try (
final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
.skip(1L).parallel();
) {
// etc
}
Run Code Online (Sandbox Code Playgroud)
但是后来一列未能解析成一个int ...
所以我尝试了一些简单的代码.文件问题很简单:
$ cat info.csv
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$
Run Code Online (Sandbox Code Playgroud)
代码同样简单:
public static void main(final String... args)
{
final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
.forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
我系统地得到以下结果(好吧,我只运行了大约20次):
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
Run Code Online (Sandbox Code Playgroud)
我在这里错过了什么?
编辑似乎问题或误解比这更根深蒂固(下面的两个例子是由FreeNode的## java编写的):
public static void main(final String... args)
{
new BufferedReader(new StringReader("Hello\nWorld")).lines()
.skip(1L).parallel()
.forEach(System.out::println);
final Iterator<String> iter
= Arrays.asList("Hello", "World").iterator();
final Spliterator<String> spliterator
= Spliterators.spliteratorUnknownSize(iter, …
Run Code Online (Sandbox Code Playgroud) 为什么要forEach
以随机顺序打印数字,同时collect
始终按原始顺序收集元素,即使是从并行流中收集?
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8};
List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray));
System.out.println("Parallel Stream: ");
listOfIntegers
.stream()
.parallel()
.forEach(e -> System.out.print(e + " "));
System.out.println();
// Collectors
List<Integer> l = listOfIntegers
.stream()
.parallel()
.collect(Collectors.toList());
System.out.println(l);
Run Code Online (Sandbox Code Playgroud)
输出:
Parallel Stream:
8 1 6 2 7 4 5 3
[1, 2, 3, 4, 5, 6, 7, 8]
Run Code Online (Sandbox Code Playgroud) 我已经阅读过这个和这个问题,但仍然怀疑Stream.skip
JDK作者是否打算观察到这种行为.
让我们简单输入数字1..20:
List<Integer> input = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
现在让我们创建一个并行流,以不同的方式结合unordered()
使用skip()
并收集结果:
System.out.println("skip-skip-unordered-toList: "
+ input.parallelStream().filter(x -> x > 0)
.skip(1)
.skip(1)
.unordered()
.collect(Collectors.toList()));
System.out.println("skip-unordered-skip-toList: "
+ input.parallelStream().filter(x -> x > 0)
.skip(1)
.unordered()
.skip(1)
.collect(Collectors.toList()));
System.out.println("unordered-skip-skip-toList: "
+ input.parallelStream().filter(x -> x > 0)
.unordered()
.skip(1)
.skip(1)
.collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)
过滤步骤在这里基本没什么,但为流引擎增加了更多的难度:现在它不知道输出的确切大小,因此关闭了一些优化.我有以下结果:
skip-skip-unordered-toList: [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// absent values: 1, 2
skip-unordered-skip-toList: …
Run Code Online (Sandbox Code Playgroud) 受这个问题的启发,我开始玩有序与无序流,并行与顺序流和终端操作,这些操作尊重遭遇顺序与不尊重它的终端操作.
在对链接问题的一个答案中,显示了与此类似的代码:
List<Integer> ordered = Arrays.asList(
1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4);
List<Integer> result = new CopyOnWriteArrayList<>();
ordered.parallelStream().forEach(result::add);
System.out.println(ordered);
System.out.println(result);
Run Code Online (Sandbox Code Playgroud)
这些名单确实不同.该unordered
列表甚至从一次运行变为另一次运行,表明结果实际上是非确定性的.
所以我创建了另一个例子:
CopyOnWriteArrayList<Integer> result2 = ordered.parallelStream()
.unordered()
.collect(Collectors.toCollection(CopyOnWriteArrayList::new));
System.out.println(ordered);
System.out.println(result2);
Run Code Online (Sandbox Code Playgroud)
我希望看到类似的结果,因为流是并行和无序的(可能unordered()
是多余的,因为它已经是并行的).但是,生成的列表是有序的,即它等于源列表.
所以我的问题是为什么收集的清单是有序的?是否collect
总是尊重遭遇顺序,即使对于并行,无序的流?它Collectors.toCollection(...)
是强制遭遇秩序的特定收集者吗?
由于我使用了很多流,其中一些处理大量数据,我认为预先分配基于集合的收集器大小是一个好主意,以防止随着集合的增长而进行昂贵的重新分配.所以我想出了这个,以及类似的其他集合类型:
public static <T> Collector<T, ?, Set<T>> toSetSized(int initialCapacity) {
return Collectors.toCollection(()-> new HashSet<>(initialCapacity));
}
Run Code Online (Sandbox Code Playgroud)
像这样使用
Set<Foo> fooSet = myFooStream.collect(toSetSized(100000));
Run Code Online (Sandbox Code Playgroud)
我担心的是,实现Collectors.toSet()
设置一个没有的Characteristics
枚举Collectors.toCollection()
:Characteristics.UNORDERED
.没有方便的变体Collectors.toCollection()
来设置超出默认值的所需特性,并且Collectors.toSet()
由于可见性问题我无法复制实现.所以,为了设置这个UNORDERED
特性我不得不这样做:
static<T> Collector<T,?,Set<T>> toSetSized(int initialCapacity){
return Collector.of(
() -> new HashSet<>(initialCapacity),
Set::add,
(c1, c2) -> {
c1.addAll(c2);
return c1;
},
new Collector.Characteristics[]{IDENTITY_FINISH, UNORDERED});
}
Run Code Online (Sandbox Code Playgroud)
所以这是我的问题:1.这是我唯一的选择,为简单的自定义toSet()
2 创建无序收集器.如果我希望这个理想地工作,是否有必要应用无序特征?我在这个论坛上读到了一个问题,在那里我了解到无序特征不再向后传播到Stream中.它仍然有用吗?
订购
流可能有也可能没有已定义的遭遇顺序.流是否具有遭遇顺序取决于源和中间操作.某些流源(例如List或数组)本质上是有序的,而其他流(例如HashSet)则不是.某些中间操作(例如sorted())可能会在其他无序流上强制执行遭遇顺序,而其他中间操作可能会呈现无序的有序流,例如BaseStream.unordered().此外,一些终端操作可以忽略遭遇顺序,例如forEach().
HashSet
?unordered
将在并行计算的每个流上的中间操作?