并行流,收集器和线程安全

ass*_*ias 39 java parallel-processing concurrency java-8 java-stream

请参阅下面的简单示例,该示例计算列表中每个单词的出现次数:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));
Run Code Online (Sandbox Code Playgroud)

最后,wordsCount{a=2, b=1, c=1}.

但我的流非常大,我想要并行工作,所以我写道:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));
Run Code Online (Sandbox Code Playgroud)

但是我注意到这wordsCount很简单HashMap所以我想知道我是否需要明确要求并发映射以确保线程安全:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));
Run Code Online (Sandbox Code Playgroud)

非并发收集器是否可以安全地与并行流一起使用,还是在从并行流收集时只应使用并发版本?

Stu*_*rks 42

非并发收集器是否可以安全地与并行流一起使用,还是在从并行流收集时只应使用并发版本?

collect并行流的操作中使用非并发收集器是安全的.

在界面的规范Collector,在有六个子弹点的部分中,是这样的:

对于非并发收集器,从结果提供者,累加器或组合器函数返回的任何结果必须是串行线程限制的.这使得集合可以并行发生,而收集器无需实现任何其他同步.减少实现必须管理输入被正确分区,分区是单独处理的,并且只有在累积完成后才进行组合.

这意味着Collectors该类提供的各种实现可以与并行流一起使用,即使其中一些实现可能不是并发收集器.这也适用于您可能实现的任何非并发收集器.它们可以安全地与并行流一起使用,前提是您的收集器不会干扰流源,无副作用,顺序无关等.

我还建议阅读java.util.stream包文档中的Mutable Reduction部分.在本节的中间是一个声明可并行化的示例,但它将结果收集到一个ArrayList不是线程安全的.

这种方式的工作方式是以非并发收集器结尾的并行流确保不同的线程始终在中间结果集合的不同实例上运行.这就是为什么收集器有一个Supplier函数,用于创建与线程一样多的中间集合,因此每个线程都可以累积到它自己的集合中.当要合并中间结果时,它们在线程之间安全地切换,并且在任何给定时间只有一个线程合并任何一对中间结果.


Bri*_*etz 19

如果所有收集器遵循规范中的规则,则可以安全地并行或顺序运行.并行准备是这里设计的关键部分.

并发和非并发收集器之间的区别与并行化方法有关.

普通(非并发)收集器通过合并子结果来操作.因此,源被分成一堆块,每个块被收集到一个结果容器(如列表或映射)中,然后子结果被合并到一个更大的结果容器中.这是安全且保持订单的,但对于某些类型的容器 - 尤其是地图 - 可能很昂贵,因为按键合并两个地图通常很昂贵.

并发收集器会创建一个结果容器,其插入操作保证是线程安全的,并从多个线程中将元素压缩到其中.使用像ConcurrentHashMap这样的高度并发的结果容器,这种方法可能比合并普通的HashMaps更好.

因此,并发收集器严格优于普通收集器.而且他们没有成本; 因为元素是从许多线程中被抨击的,所以并发收集器通常不能保留遭遇顺序.(但是,通常你不关心 - 在创建单词计数直方图时,你不关心你首先计算的"foo"实例.)

  • @Noumenon 3 和 4/5 的区别是 `groupingBy` 和 `groupingByConcurrent` 的区别。前者保证顺序保存,并且是并行安全的,但可能更慢。后者也是并行安全的,通常并行化更好,但牺牲了顺序保持。程序员必须做出权衡。 (2认同)

nos*_*sid 10

使用非并发集合和非原子计数器与并行流是安全的.

如果你看一下Stream :: collect的文档,你会发现以下段落:

同样reduce(Object, BinaryOperator),可以并行化收集操作,而无需额外的同步.

对于方法Stream :: reduce:

虽然与简单地在循环中改变运行总计相比,这似乎是更加迂回的执行聚合的方式,但是还原操作可以更优雅地并行化,而无需额外的同步并且大大降低了数据竞争的风险.

这可能有点令人惊讶.但请注意,并行流基于fork-join模型.这意味着并发执行的工作方式如下:

  • 将序列分成两个大小相同的部分
  • 单独处理每个部分
  • 收集两个部分的结果并将它们组合成一个结果

在第二步中,将三个步骤递归地应用于子序列.

一个例子应该清楚.该

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);
Run Code Online (Sandbox Code Playgroud)

Trace类的唯一目的是记录构造函数和方法调用.如果执行此语句,则会打印以下行:

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine
Run Code Online (Sandbox Code Playgroud)

你可以看到,四个跟踪已创建的对象,积聚已在每个对象上调用一次,并联合收割机已使用三次四个对象合二为一.每个对象一次只能由一个线程访问.这使代码线程安全,同样适用于Collectors :: toMap方法.