Jen*_*olm 1 java java-8 java-stream
我有一个Java 8流的问题,其中数据是以突然的批量处理,而不是在请求时.我有一个相当复杂的流 - 流,必须并行化,因为我concat用来合并两个流.
我的问题源于这样一个事实,即数据似乎在大量分钟内 - 有时甚至是数小时 - 被解析.我希望一旦Stream读取传入数据就会发生这种处理,以分散工作量.批量处理几乎在所有方面都是违反直觉的.
所以,问题是为什么这个批量收集发生以及如何避免它.
我的输入是一个未知大小的Spliterator,我使用forEach作为终端操作.
这是并行流的基本原则,遭遇顺序不必与处理顺序相匹配.如果需要,这使得能够在组装正确排序的结果的同时处理子列表或子树的项目.这明确允许批量处理,甚至使其成为有序流的并行处理的必需.
此行为是由特定的实施的决定Spliterator的trySplit实施.该规范说:
如果此Spliterator是
ORDERED,则返回的Spliterator必须覆盖元素的严格前缀...
API注意:
trySplit有效(无遍历)的理想方法将其元素精确地分成两半,从而实现平衡并行计算.
为什么这个策略在规范中固定而不是,例如偶数/奇数分裂?
好吧,考虑一个简单的用例.列表将被过滤并收集到新列表中,因此必须保留遭遇顺序.使用前缀规则,它很容易实现.拆分前缀,同时过滤两个块,然后将前缀过滤的结果添加到新列表,然后添加过滤后缀.
有了甚至奇怪的策略,这是不可能的.您可以同时过滤这两个部分,但之后,除非在整个操作过程中跟踪每个项目的位置,否则您不知道如何正确加入结果.
即使这样,加入这些齿轮加工项目也比执行addAll每个加工单元要复杂得多.
您可能已经注意到,如果您有可能必须保留的遭遇订单,则这一切都只适用.如果您的分裂器没有报告ORDERED特征,则不需要返回前缀.尽管如此,您可能继承的默认实现AbstractSpliterator旨在与有序的spliterator兼容.因此,如果您需要不同的策略,则必须自己实现拆分操作.
或者您使用不同的方式来实现无序流,例如
Stream.generate(()->{
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
return Thread.currentThread().getName();
}).parallel().forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
可能更接近你的预期.
| 归档时间: |
|
| 查看次数: |
184 次 |
| 最近记录: |