在java 8中从并行流中收集

Vip*_*yal 16 java java.util.concurrent java-8 java-stream

我想获取输入并在其上应用并行流,然后我想输出为列表.输入可以是我们可以应用流的任何列表或任何集合.

我担心的是,如果我们想要输出作为映射它们,我们有一个来自java的选项就像

list.parallelStream().collect(Collectors.toConcurrentMap(args))
Run Code Online (Sandbox Code Playgroud)

但是我没有选择以线程安全的方式从并行流中收集以提供列表作为输出.我看到另外一个选项可供使用

list.parallelStream().collect(Collectors.toCollection(<Concurrent Implementation>))

通过这种方式,我们可以在collect方法中提供各种并发实现.但我认为java.util.concurrent中只存在CopyOnWriteArrayList List实现.我们可以在这里使用各种队列实现,但那些不会像列表一样.我的意思是我们可以解决这个问题.

如果我想要输出列表,你能指导一下最好的方法吗?

注意:我找不到与此相关的任何其他帖子,任何参考都会有所帮助.

And*_*eas 27

Collection用于接收正在收集的数据的对象不需要是并发的.你可以给它一个简单的ArrayList.

这是因为并行流中的值集合实际上并未收集到单个Collection对象中.每个线程将收集自己的数据,然后所有子结果将合并为单个最终Collection对象.

这在Collectorjavadoc中都有详细记录,并且Collector是您为collect()方法提供的参数:

<R,A> R collect(Collector<? super T,A,R> collector)
Run Code Online (Sandbox Code Playgroud)

  • @VipulGoyal这显然是出于优化目的.合并大的`HashMap`可能非常昂贵,并且`ConcurrentHashMap`在他们实现流时已经存在,那么为什么不使用呢? (2认同)
  • @VipulGoyal如果流(输入)和集合(输出)都是有序的,则并发集合将无济于事,因为必须按顺序收集值.但是,如果不必维护订单,并且集合是并发的,那么所有并行线程都可以添加到单个结果集合中,而不是通过构建需要合并的中间子结果. (2认同)

Eug*_*ene 7

But there is no option that I can see to collect from parallel stream in thread safe way to provide list as output.这是完全错误的.

流中的重点是,您可以使用非线程安全的集合来实现完全有效的线程安全结果.这是因为流的实现方式(这是流设计的关键部分).您可以看到a Collector定义了一个方法supplier,在每个步骤中都将创建一个新实例.这些实例将在它们之间合并.

所以这是完全线程安全的:

 Stream.of(1,2,3,4).parallel()
          .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

由于此流中有4个元素,因此将ArrayList创建4个实例,这些实例将在最后合并为单个结果(假设至少有4个CPU核心)

另一方面,toConcurrent生成单个结果容器和所有线程的方法会将结果放入其中.

  • ...假设至少有 5 个 CPU 核心!Stream 默认使用 ForkJoinPool 和 ForkJoinPool.commonPool() 大小默认为 Runtime.getRuntime().availableProcessors() - 1 (2认同)