Java并行流:如何等待并行流的线程完成?

Sim*_*imo 10 java collections parallel-processing multithreading java-stream

所以我有一个列表,我从中获得一个并行流来填写地图,如下所示:

Map<Integer, TreeNode> map = new HashMap<>();
List<NodeData> list = some_filled_list;

//Putting data from the list into the map
list.parallelStream().forEach(d -> {
                TreeNode node = new TreeNode(d);
                map.put(node.getId(), node);
            });

//print out map
map.entrySet().stream().forEach(entry -> {
     System.out.println("Processing node with ID = " + entry.getValue().getId());
                });
Run Code Online (Sandbox Code Playgroud)

这段代码的问题在于,当"放置数据"过程仍在进行时(因为它是并行的),地图正在被打印出来,因此,地图尚未从列表中接收到所有元素.当然,在我的真实代码中,不仅仅是打印出地图; 我使用地图来利用O(1)查找时间.

我的问题是:

  1. 如何使主线程等待,以便在打印出地图之前完成"放置数据"?我试图把"把数据"线程T内,做t.start()t.join(),但是这并没有帮助.

  2. 也许在这种情况下我不应该使用并行流?列表很长,我只想利用并行性来提高效率.

Eug*_*ene 12

这样list.parallelStream().forEach您就违反side-effects了Stream文档中明确声明的属性.

此外,当你说这个代码是在"放置数据"过程仍在进行时打印出来的地图(因为它是并行的),这不是真的,forEach终端操作也是如此,它将等待完成,直到它可以成为下一行的过程.您可能会看到这样,因为您正在收集非线程安全的,HashMap并且某些条目可能不在该映射中...考虑其他方式,如果您将多个条目放入多个线程中会发生什么情况HashMap?好吧,很多东西可能会破坏,比如丢失条目,不正确/不一致的地图等等.

当然,将其更改为a ConcurrentHashMap将起作用,因为它是线程安全的,但您仍然违反了副作用属性,尽管是以"安全"的方式.

做正确的事情是collect一个到Map的情况下直接forEach:

Map<Integer, TreeNode> map = list.parallelStream()
        .collect(Collectors.toMap(
                NodeData::getId,
                TreeNode::new
        ));
Run Code Online (Sandbox Code Playgroud)

这样,即使是并行处理,一切都会好的.请注意,您需要大量(数万个元素)才能从并行处理中获得可测量的性能提升.

  • 提到平行流并不是任何事情的圣杯 (2认同)
  • 实际上发生在现实生活中的HashMap不一致的最有趣的例子之一就是遇到一个无限循环,显然有节点循环链接,这在用心态看代码时是不可能的.顺序执行.哦,`list.parallelStream().collect(Collectors.toMap(NodeData :: getId,TreeNode :: new))`是无论元素数量多少都无法从并行中获取的场景之一,因为合并成本与任何潜在的储蓄相当. (2认同)

小智 5

流操作将阻塞,直到并行和非并行实现完成为止。

所以你看到的不是the "putting data" process is still going on——很可能只是数据损坏,因为HashMap不是线程安全的。尝试使用ConcurrentHashMap替代。