zic*_*ico 5 java parallel-processing java-stream
考虑以下简单代码:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
Run Code Online (Sandbox Code Playgroud)
很长一段时间,我认为即使在flatMap. 但是上面的代码打印了所有的“Thread:main”,证明我的想法是错误的。
一种使其并行的简单方法flatMap是收集然后再次流式传输:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.collect(Collectors.toList())
.parallelStream()
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
Run Code Online (Sandbox Code Playgroud)
我想知道是否有更好的方法,以及flatMap仅在调用之前并行化流的设计选择,而不是在调用之后并行化。
========关于问题的更多说明========
从一些答案来看,我的问题似乎没有完全表达出来。正如@Andreas 所说,如果我从 3 个元素的 Stream 开始,可能有 3 个线程正在运行。
但我的问题确实是:根据这篇文章,Java Stream 使用一个通用的 ForkJoinPool,其默认大小等于内核数少 1 。现在假设我有 64 个内核,那么我希望我上面的代码会在 之后看到许多不同的线程flatMap,但实际上,它只看到一个(在 Andreas 的情况下是 3 个)。顺便说一句,我确实曾经isParallel观察过流是并行的。
老实说,我问这个问题并不是为了纯粹的学术兴趣。我在一个项目中遇到了这个问题,该项目提供了一长串用于转换数据集的流操作。该链从单个文件开始,然后通过 .bat 扩展为许多元素flatMap。但显然,在我的实验中,它并没有完全利用我的机器(它有 64 个内核),而只使用了一个内核(从 CPU 使用情况的观察来看)。
flatMap我想知道 [...] 关于仅在调用之前并行化流的设计选择,而不是在调用之后并行化流。
你错了。之前和之后的所有步骤flatMap都是并行运行的,但它只在线程之间分割原始流。然后,该flatMap操作由一个这样的线程处理,并且其流不会被分割。
由于您的原始流只有 1 个元素,因此无法拆分,因此parallel没有效果。
尝试更改为Stream.of(1, 2, 3),您将看到后面forEach的,实际上在 3 个不同的线程中运行。flatMap
| 归档时间: |
|
| 查看次数: |
293 次 |
| 最近记录: |