Ris*_*wal 5 java multithreading java-8 java-stream
我在网上搜索了各种文章和 Stack Overflow 问题,但我找不到完美的答案。有许多问题与此接近,但略有不同。
我们知道 Java 8 Streams API 在内部使用 Fork-Join Pool。
现在我的问题是如何使用 Fork-Join 池划分流管道中的任务?
假设我们有以下内容:
List myList = inputList.parallelStream().filter( x -> x>0 )
.map(x -> x+100 ).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
现在我们有两种使用线程池划分任务的选项。
filter和map作为单个任务并使用 fork-join 池运行它。filter和map作为两个不同的任务,并使用两个不同的 fork-join 线程池运行它们。我也知道流是延迟传播的,所以如果我们 在两者之间有一个 有状态的中间操作:
List myList2 = inputList.parallelStream().filter( x -> x>0 )
.map(x -> x+5 ).sorted().map(x -> x+5 ).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
那么如何创建线程池呢?
PS:之前就知道map功能是可以组合的。我只是想为这个问题举个例子。
首先,您必须使用parallel才能Fork-Join Pool处于活动状态。这个答案解释了Spliterator分裂是如何执行的;但简单来说,分割是使用流元素的源完成的,并且整个管道是并行处理的。在您的示例中,filter 正如 map您所说的那样(当然它terminal也包括操作)。
对于有状态操作 - 事情更加复杂。让我们举distinct个例子,首先看看它是如何处理顺序情况的。
一般来说,您可能会认为 anon-parallel distinct可以使用HashSet- 来实现,您是对的。HashSet可以保存所有已经看到的值,并且根本不处理(发送到下一个操作)其他元素 - 理论上您将完成非并行distinct操作。但如果Stream已知 是怎么办SORTED?想想看,这意味着我们可以保留一个HashSet标记为 的元素(而不是之前的 a)seen。基本上如果你有:
1,1,2,2,3
Run Code Online (Sandbox Code Playgroud)
这意味着您的有状态操作可以在单个元素之上实现 - 而不是HashSet; 代码会是这样的:
T seen = null;
....
if(seen == null || (!currentElement.equals(seen)){
seen = currentElement;
// process seen;
}
Run Code Online (Sandbox Code Playgroud)
但是这种优化只有当您知道流是 时才可能实现SORTED,因为这样您就知道下一个元素要么与您已经看到的元素相同,要么是一个新元素,这是您之前不可能看到的其他一些先前的操作 - 这是由排序操作保证的。
现在是如何parallel distinct实施的。你基本上会问这个问题:
那么线程池是如何创建的呢
同样,从 Stream 的角度来看,没有任何变化,ForkJoinPool使用相同数量的线程 - 显然,唯一改变的是流实现。
简而言之,如果您的Stream内部ORDERED实现使用 a LinkedHashSet(实际上是这个的多个实例,因为它确实在这种情况下减少了)来保留您的订单,并且ConcurrentHashMap如果您不关心订单,则它使用 a - 即如果源没有排序(如 a Set)或者您使用显式调用unordered。sorted如果您确实想知道它是如何完成的,您也可以查找其实现。
所以底线是 aFork Join Pool不会改变基于流的实现,它使用相同的模型。另一方面,根据您所拥有的操作,Stream API 可能会使用一些有状态数据来进行有状态中间操作,例如 、HashSet/ConcurrentHashMap或单个元素等。