Roy*_*Ash 7 java multithreading java-8 java-stream java-17
当我运行以下代码时,8 个可用线程中只有 2 个可以运行,任何人都可以解释为什么会出现这种情况吗?我怎样才能改变代码,使其能够利用所有 8 个线程?
Tree.java
:
package il.co.roy;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class Tree<T>
{
private final T data;
private final Set<Tree<T>> subTrees;
public Tree(T data, Set<Tree<T>> subTrees)
{
this.data = data;
this.subTrees = subTrees;
}
public Tree(T data)
{
this(data, new HashSet<>());
}
public Tree()
{
this(null);
}
public T getData()
{
return data;
}
public Set<Tree<T>> getSubTrees()
{
return subTrees;
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tree<?> tree = (Tree<?>) o;
return Objects.equals(data, tree.data) &&
Objects.equals(subTrees, tree.subTrees);
}
@Override
public int hashCode()
{
return Objects.hash(data, subTrees);
}
@Override
public String toString()
{
return "Tree{" +
"data=" + data +
", subTrees=" + subTrees +
'}';
}
public void sendCommandAll()
{
if (data != null)
System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
try
{
Thread.sleep(5000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
if (data != null)
System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
subTrees.parallelStream()
// .map(Tree::sendCommandAll)
.forEach(Tree::sendCommandAll);
// .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
}
}
Run Code Online (Sandbox Code Playgroud)
forEach
(我是否使用或并不重要reduce
)。
Main.java
:
package il.co.roy;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Main
{
public static void main(String... args)
{
System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());
final Tree<Integer> root = new Tree<>(null,
Set.of(new Tree<>(1,
IntStream.range(2, 7)
.boxed()
.map(Tree::new)
.collect(Collectors.toSet()))));
root.sendCommandAll();
// IntStream.generate(() -> 1)
// .parallel()
// .forEach(i ->
// {
// System.out.println(Thread.currentThread().getName());
// try
// {
// Thread.sleep(5000);
// } catch (InterruptedException e)
// {
// e.printStackTrace();
// }
// });
}
}
Run Code Online (Sandbox Code Playgroud)
在该main
方法中,我创建了具有以下结构的树:\
root (data is `null`)
|- 1
|- 2
|- 3
|- 4
|- 5
|- 6
Run Code Online (Sandbox Code Playgroud)
sendCommandAll
仅当其父级完成处理时,函数才处理每个子树(并行)。但结果如下:
处理器: 8
[main] 将命令发送到 1
[main] 树,数据 1 为 true
[main] 将命令发送到 6
[ForkJoinPool.commonPool-worker-2] 将命令发送到 5
[main] 树,数据 6 为 true
[ForkJoinPool .commonPool-worker-2] 数据为 5 的树得到 true
[ForkJoinPool.commonPool-worker-2] 发送命令到 4
[ForkJoinPool.commonPool-worker-2] 数据为 4 的树得到 true
[ForkJoinPool.commonPool-worker-2]向包含数据 3 的 3 个 [ForkJoinPool.commonPool-worker-2] 树发送命令
得到 true
[ForkJoinPool.commonPool-worker-2] 向
包含数据 2 的 2 [ForkJoinPool.commonPool-worker-2] 树发送命令得到 true
(郑重声明,当我执行 中的注释代码时Main.java
,JVM 使用所有 7 (+ 1) 个可用线程commonPool
)
我该如何改进我的代码?
正如本答案后半部分所解释的,处理HashMap
s 或s时的线程利用率HashSet
取决于后备数组中元素的分布,而后备数组又取决于哈希码。特别是与(默认)容量相比,元素数量较少,这可能会导致不良的工作分配。
一个简单的解决方法是使用new ArrayList<>(subTrees).parallelStream()
而不是subTrees.parallelStream()
.
但请注意,您的方法在处理子节点之前执行当前节点的实际工作(在使用 a 模拟的示例中sleep
),这也减少了潜在的并行性。
您可以使用
public void sendCommandAll() {
if(subTrees.isEmpty()) {
actualSendCommand();
return;
}
List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
tmp.addAll(subTrees);
tmp.add(this);
tmp.parallelStream().forEach(t -> {
if(t != this) t.sendCommandAll(); else t.actualSendCommand();
});
}
private void actualSendCommand() {
if (data != null)
System.out.println("[" + Thread.currentThread().getName()
+ "] sending command to " + data);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (data != null)
System.out.println("[" + Thread.currentThread().getName()
+ "] tree with data " + data + " got " + true);
}
Run Code Online (Sandbox Code Playgroud)
这允许在处理子节点的同时处理当前节点。