Ste*_*lvi 6 java concurrency binary-tree java-stream
我正在努力寻找一种从这个流中获得加速的正确方法:
StreamSupport.stream(new BinaryTreeSpliterator(root), true)
.parallel()
.map(node -> processor.onerousFunction(node.getValue()))
.mapToInt(i -> i.intValue())
.sum()
Run Code Online (Sandbox Code Playgroud)
onerousFunction() 只是一个使线程工作一点并返回节点的int值的函数.
无论我使用多少cpu,执行时间总是保持不变.我认为这个问题代表我写的Spliterator:
public class BinaryTreeSpliterator extends AbstractSpliterator<Node> {
private LinkedBlockingQueue<Node> nodes = new LinkedBlockingQueue<>();
public BinaryTreeSpliterator(Node root) {
super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
this.nodes.add(root);
}
@Override
public boolean tryAdvance(Consumer<? super Node> action) {
Node current = this.nodes.poll();
if(current != null) {
action.accept(current);
if(current.getLeft() != null)
this.nodes.offer(current.getLeft());
if(current.getRight() != null)
this.nodes.offer(current.getRight());
return true;
}
return false;
}
}
Run Code Online (Sandbox Code Playgroud)
但我真的找不到一个好的解决方案.
要并行处理数据,您需要一个trySplit实现来将部分数据作为新Spliterator实例返回.每个单个线程遍历spliterator实例.因此,顺便提一下,你的分词器中不需要线程安全集合.但是你的问题是你继承了trySplit实现,AbstractSpliterator尽管不了解你的数据,但它确实尝试提供一些并行支持.
它通过顺序请求一些项目,将它们缓冲到一个数组并返回一个新的基于数组的spliterator来实现.不幸的是,它不能很好地处理"未知大小"(这同样适用于一般的并行流实现).默认情况下它会缓冲1024个元素,如果有多少元素,下次会缓冲更多.更糟糕的是,流实现不会使用基于数组的spliterator的良好分割功能,因为它像文字一样处理"未知大小" Long.MAX_VALUE,结论是你的分裂器比阵列中的1024个元素有更多的元素,因此,甚至不会尝试拆分基于数组的spliterator.
您的分裂器可以实现更合适的trySplit方法:
public class BinaryTreeSpliterator extends AbstractSpliterator<Node> {
/**
* a node that has not been traversed, but its children are only
* traversed if contained in this.pending
* (otherwise a different spliterator might be responsible)
*/
private Node pendingNode;
/** pending nodes needing full traversal */
private ArrayDeque<Node> pending = new ArrayDeque<>();
public BinaryTreeSpliterator(Node root) {
super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
push(root);
}
private BinaryTreeSpliterator(Node pending, Node next) {
super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
pendingNode = pending;
if(next!=null) this.pending.offer(next);
}
private void push(Node n) {
if(pendingNode == null) {
pendingNode = n;
if(n != null) {
if(n.getRight()!=null) pending.offerFirst(n.getRight());
if(n.getLeft() !=null) pending.offerFirst(n.getLeft());
}
}
else pending.offerFirst(n);
}
@Override
public boolean tryAdvance(Consumer<? super Node> action) {
Node current = pendingNode;
if(current == null) {
current = pending.poll();
if(current == null) return false;
push(current.getRight());
push(current.getLeft());
}
else pendingNode = null;
action.accept(current);
return true;
}
@Override
public void forEachRemaining(Consumer<? super Node> action) {
Node current = pendingNode;
if(current != null) {
pendingNode = null;
action.accept(current);
}
for(;;) {
current = pending.poll();
if(current == null) break;
traverseLocal(action, current);
}
}
private void traverseLocal(Consumer<? super Node> action, Node current) {
do {
action.accept(current);
Node child = current.getLeft();
if(child!=null) traverseLocal(action, child);
current = current.getRight();
} while(current != null);
}
@Override
public Spliterator<Node> trySplit() {
Node next = pending.poll();
if(next == null) return null;
if(pending.isEmpty()) {
pending.offer(next);
next = null;
}
if(pendingNode==null) return next==null? null: new BinaryTreeSpliterator(next);
Spliterator<Node> s = new BinaryTreeSpliterator(pendingNode, next);
pendingNode = null;
return s;
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,此分词符也可以作为分ORDERED词符,保持左上角的顺序.完全无序的分裂器可以稍微简单一些地实现.
您可以实现forEachRemaining比继承默认值更有效的方法,例如
@Override
public void forEachRemaining(Consumer<? super Node> action) {
Node current = pendingNode;
if(current != null) {
pendingNode = null;
action.accept(current);
}
for(;;) {
current = pending.poll();
if(current == null) break;
traverseLocal(action, current);
}
}
private void traverseLocal(Consumer<? super Node> action, Node current) {
do {
action.accept(current);
Node child = current.getLeft();
if(child!=null) traverseLocal(action, child);
current = current.getRight();
} while(current != null);
}
Run Code Online (Sandbox Code Playgroud)
但是,如果您的应用程序必须处理不平衡树(特别是此示例中的非常长的左路径),则此方法可能会导致stackoverflow错误.