为什么ParallelStream不会在递归中使用所有commonPool的线程?

Roy*_*Ash 7 java multithreading java-8 java-stream java-17

当我运行以下代码时,8 个可用线程中只有 2 个可以运行,任何人都可以解释为什么会出现这种情况吗?我怎样才能改变代码,使其能够利用所有 8 个线程?

Tree.java

package il.co.roy;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class Tree<T>
{
    private final T data;
    private final Set<Tree<T>> subTrees;

    public Tree(T data, Set<Tree<T>> subTrees)
    {
        this.data = data;
        this.subTrees = subTrees;
    }

    public Tree(T data)
    {
        this(data, new HashSet<>());
    }

    public Tree()
    {
        this(null);
    }

    public T getData()
    {
        return data;
    }

    public Set<Tree<T>> getSubTrees()
    {
        return subTrees;
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Tree<?> tree = (Tree<?>) o;
        return Objects.equals(data, tree.data) &&
                Objects.equals(subTrees, tree.subTrees);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(data, subTrees);
    }

    @Override
    public String toString()
    {
        return "Tree{" +
                "data=" + data +
                ", subTrees=" + subTrees +
                '}';
    }

    public void sendCommandAll()
    {
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
        try
        {
            Thread.sleep(5000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
        subTrees.parallelStream()
//              .map(Tree::sendCommandAll)
                .forEach(Tree::sendCommandAll);
//              .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
    }
}
Run Code Online (Sandbox Code Playgroud)

forEach(我是否使用或并不重要reduce)。

Main.java

package il.co.roy;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Main
{
    public static void main(String... args)
    {
        System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());


        final Tree<Integer> root = new Tree<>(null,
                Set.of(new Tree<>(1,
                        IntStream.range(2, 7)
                                        .boxed()
                                        .map(Tree::new)
                                        .collect(Collectors.toSet()))));

        root.sendCommandAll();

//      IntStream.generate(() -> 1)
//              .parallel()
//              .forEach(i ->
//              {
//                  System.out.println(Thread.currentThread().getName());
//                  try
//                  {
//                      Thread.sleep(5000);
//                  } catch (InterruptedException e)
//                  {
//                      e.printStackTrace();
//                  }
//              });
    }
}
Run Code Online (Sandbox Code Playgroud)

在该main方法中,我创建了具有以下结构的树:\

root (data is `null`)
  |- 1
     |- 2
     |- 3
     |- 4
     |- 5
     |- 6
Run Code Online (Sandbox Code Playgroud)

sendCommandAll仅当其父级完成处理时,函数才处理每个子树(并行)。但结果如下:

处理器: 8
[main] 将命令发送到 1
[main] 树,数据 1 为 true
[main] 将命令发送到 6
[ForkJoinPool.commonPool-worker-2] 将命令发送到 5
[main] 树,数据 6 为 true
[ForkJoinPool .commonPool-worker-2] 数据为 5 的树得到 true
[ForkJoinPool.commonPool-worker-2] 发送命令到 4
[ForkJoinPool.commonPool-worker-2] 数据为 4 的树得到 true
[ForkJoinPool.commonPool-worker-2]向包含数据 3 的 3 个 [ForkJoinPool.commonPool-worker-2] 树发送命令
得到 true
[ForkJoinPool.commonPool-worker-2] 向
包含数据 2 的 2 [ForkJoinPool.commonPool-worker-2] 树发送命令得到 true

(郑重声明,当我执行 中的注释代码时Main.java,JVM 使用所有 7 (+ 1) 个可用线程commonPool

我该如何改进我的代码?

Hol*_*ger 4

正如本答案后半部分所解释的,处理HashMaps 或s时的线程利用率HashSet取决于后备数组中元素的分布,而后备数组又取决于哈希码。特别是与(默认)容量相比,元素数量较少,这可能会导致不良的工作分配。

一个简单的解决方法是使用new ArrayList<>(subTrees).parallelStream()而不是subTrees.parallelStream().

但请注意,您的方法在处理子节点之前执行当前节点的实际工作(在使用 a 模拟的示例中sleep),这也减少了潜在的并行性。

您可以使用

public void sendCommandAll() {
    if(subTrees.isEmpty()) {
        actualSendCommand();
        return;
    }
    List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
    tmp.addAll(subTrees);
    tmp.add(this);
    tmp.parallelStream().forEach(t -> {
        if(t != this) t.sendCommandAll(); else t.actualSendCommand();
    });
}

private void actualSendCommand() {
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] sending command to " + data);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] tree with data " + data + " got " + true);
}
Run Code Online (Sandbox Code Playgroud)

这允许在处理子节点的同时处理当前节点。

  • 您阅读了链接的答案吗?面临的挑战是在不花费太多时间分析情况的情况下分解工作。我调试了你的情况,元素确实都聚集在数组的一个区域中,因此将数组拆分为大小相等的范围会导致几个全空范围。`TreeMap` 不应该有这个问题,因为节点已经(几乎)平衡了,这允许递归地将一半传递给另一个工作线程,但是,它不是完美平衡的,这仍然可能导致小集合的并行性较低。 (3认同)
  • @RoyAsh 尝试 LinkedList,情况更糟。它是分裂启发式、集合拓扑以及 Spliterator 实现质量的复杂函数。 (3认同)
  • 然后,简单的解决方案“new ArrayList&lt;&gt;(subTrees).parallelStream()”就可以了。我将为未来可能没有此限制的读者保留其他解决方案...... (2认同)
  • @RoyAsh这不是一个错误,也不是特定于基于哈希的集合。并行处理是一种权衡;分裂和合并是有成本的,我们希望通过投入更多的 CPU 来解决这个问题。一般来说,拆分为一个元素并不是最佳的(并且,在小数据集上使用并行性也不是最佳的。)拆分启发式方法经过调整,可以在计算很大程度上受 CPU 限制的大型数据集上实现有效的并行性。 (2认同)
  • @RoyAsh `ArrayList` 知道所有元素都存储在其支持数组中,从索引 0 到 4。将它们分发给工人很容易。但是“HashSet”默认情况下有一个长度为 16 的后备数组,并且五个元素根据它们的哈希码存储在该数组的*某处*。然后,它获得任务“为另一个工作人员选择(大约)一半”*而无需实际迭代*。请注意,该算法也必须适用于 1 亿长度的数组。因此,它需要该数组的一半,希望能够捕获接近一半的元素。然后,每个工作人员递归地重复此操作。 (2认同)