在Java中,我如何有效和优雅地流式传输树节点的后代?

use*_*730 18 java algorithm java-8 java-stream

假设我们有一个由唯一Strings 标识的对象集合,以及一个Tree定义它们层次结构的类.该类使用Mapfrom节点(由其ID表示)到Collection其各自子节点的s来实现.

class Tree {
  private Map<String, Collection<String>> edges;

  // ...

  public Stream<String> descendants(String node) {
    // To be defined.
  }
}
Run Code Online (Sandbox Code Playgroud)

我想启用流式节点的后代.一个简单的解决方案是:

private Stream<String> children(String node) {
    return edges.getOrDefault(node, Collections.emptyList()).stream();
}

public Stream<String> descendants(String node) {
    return Stream.concat(
        Stream.of(node),
        children(node).flatMap(this::descendants)
    );
}
Run Code Online (Sandbox Code Playgroud)

在继续之前,我想对此解决方案做出以下断言.(我对这些是正确的吗?)

  1. Stream返回的descendants消耗资源(时间和内存) - 相对于树的大小 - 以与手动编码递归相同的复杂度顺序行走.特别地,表示迭代状态(Streams,Spliterators,...)的中间对象形成堆栈,因此在任何给定时间的存储器要求与树的深度具有相同的复杂度.

  2. 据我所知,只要我在执行终止操作Stream从返回descendants,根级别调用flatMap将导致所有包含Stream秒-一个用于每个(递归)呼叫descendants-被立即实现.因此,结果Stream只是在第一级递归上是懒惰的,但不是超出.(根据Tagir Valeevs的回答编辑.)

如果我正确地理解了这些要点,我的问题就是:我如何定义descendants以便结果Stream是懒惰的?

我希望解决方案尽可能优雅,因为我更喜欢一种隐含迭代状态的解决方案.(为了澄清我的意思:我知道我可以编写一个Spliterator遍历树的同时Spliterator在每个级别维护一个明确的s 堆栈.我想避免这种情况.)

(在Java中可能有一种方法可以将其表示为生产者 - 消费者工作流,就像人们可以在Julia和Go等语言中使用一样吗?)

Hol*_*ger 16

对我来说,你的解决方案已经尽可能优雅,而且有限的懒惰不是你的错.最简单的解决方案是等待JRE开发人员修复它.它已经完成了Java 10.

但是,如果今天实施的这种有限的懒惰确实是一个问题,那么也许是时候以一般方式解决这个问题了.好吧,它关于实现一个Spliterator,但不是特定于你的任务.相反,它是一个重新实现的flatmap操作,服务于原始实现的有限懒惰很重要的所有情况:

public class FlatMappingSpliterator<E,S> extends Spliterators.AbstractSpliterator<E>
implements Consumer<S> {

    static final boolean USE_ORIGINAL_IMPL
        = Boolean.getBoolean("stream.flatmap.usestandard");

    public static <T,R> Stream<R> flatMap(
        Stream<T> in, Function<? super T,? extends Stream<? extends R>> mapper) {

        if(USE_ORIGINAL_IMPL)
            return in.flatMap(mapper);

        Objects.requireNonNull(in);
        Objects.requireNonNull(mapper);
        return StreamSupport.stream(
            new FlatMappingSpliterator<>(sp(in), mapper), in.isParallel()
        ).onClose(in::close);
    }

    final Spliterator<S> src;
    final Function<? super S, ? extends Stream<? extends E>> f;
    Stream<? extends E> currStream;
    Spliterator<E> curr;

    private FlatMappingSpliterator(
        Spliterator<S> src, Function<? super S, ? extends Stream<? extends E>> f) {
        // actually, the mapping function can change the size to anything,
        // but it seems, with the current stream implementation, we are
        // better off with an estimate being wrong by magnitudes than with
        // reporting unknown size
        super(src.estimateSize()+100, src.characteristics()&ORDERED);
        this.src = src;
        this.f = f;
    }

    private void closeCurr() {
        try { currStream.close(); } finally { currStream=null; curr=null; }
    }

    public void accept(S s) {
        curr=sp(currStream=f.apply(s));
    }

    @Override
    public boolean tryAdvance(Consumer<? super E> action) {
        do {
            if(curr!=null) {
                if(curr.tryAdvance(action))
                    return true;
                closeCurr();
            }
        } while(src.tryAdvance(this));
        return false;
    }

    @Override
    public void forEachRemaining(Consumer<? super E> action) {
        if(curr!=null) {
            curr.forEachRemaining(action);
            closeCurr();
        }
        src.forEachRemaining(s->{
            try(Stream<? extends E> str=f.apply(s)) {
                if(str!=null) str.spliterator().forEachRemaining(action);
            }
        });
    }

    @SuppressWarnings("unchecked")
    private static <X> Spliterator<X> sp(Stream<? extends X> str) {
        return str!=null? ((Stream<X>)str).spliterator(): null;
    }

    @Override
    public Spliterator<E> trySplit() {
        Spliterator<S> split = src.trySplit();
        if(split==null) {
            Spliterator<E> prefix = curr;
            while(prefix==null && src.tryAdvance(s->curr=sp(f.apply(s))))
                prefix=curr;
            curr=null;
            return prefix;
        }
        FlatMappingSpliterator<E,S> prefix=new FlatMappingSpliterator<>(split, f);
        if(curr!=null) {
            prefix.curr=curr;
            curr=null;
        }
        return prefix;
    }
}
Run Code Online (Sandbox Code Playgroud)

所有你需要使用它,是一个新增import static的的flatMap方法,将代码和改变形式的表达stream.flatmap(function)flatmap(stream, function).

即你的代码

public Stream<String> descendants(String node) {
    return Stream.concat(
        Stream.of(node),
        flatMap(children(node), this::descendants)
    );
}
Run Code Online (Sandbox Code Playgroud)

然后你有完全懒惰的行为.我用无限流测试了它...

请注意,我添加了一个切换以允许返回到原始实现,例如在-Dstream.flatmap.usestandard=true命令行上指定    时.

  • "Long.MAX_VALUE"由API指定为"未知大小",而不是"非常大的大小".如果流实现将其解释为错误,那就是一个错误.由于函数可以返回从空流到无限流的任何内容,因此我认为估计任何大小都不合适.推理也是错误的.如果子任务由于异常或短路而完成得更快,则它们不应该获得新任务,因为整个操作应该以任何方式结束.然而,出于实际目的,我添加它. (3认同)
  • 请注意,当提高每个项目的开销时,如`flatMap(IntStream.range(0,1000).boxed().parallel(),Stream :: of).map(i - > {LockSupport.parkNanos(1); return i;}).collect(Collectors.summingInt(Integer :: intValue));`执行*确实*受益于并行执行,两个实现都是平等的. (2认同)
  • 原来如此.我的测试的真正问题是我应该在你的flatMap之前并行*,因为添加无辜的`boxed()`已经使得源分裂器不可分割.这很好用`flatMap(IntStream.range(0,1000000).boxed().parallel(),Stream :: of).collect(Collectors.summingInt(Integer :: intValue))`. (2认同)
  • 也许你可以看一下[this](https://github.com/JavaChat/streems); 它可能会帮助你 (2认同)

Tag*_*eev 5

你说这个flatMap流不是很懒,这有点不对。它有点懒,尽管它的惰性确实很有限。让我们使用一些自定义Collection来跟踪类中所请求的元素Tree

private final Set<String> requested = new LinkedHashSet<>();

private class MyList extends AbstractList<String> implements RandomAccess
{
    private final String[] data;

    public MyList(String... data) {
        this.data = data;
    }

    @Override
    public String get(int index) {
        requested.add(data[index]);
        return data[index];
    }

    @Override
    public int size() {
        return data.length;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,让我们使用一些树数据来预初始化您的类:

public Tree() {
    // "1" is the root note, contains three immediate descendants
    edges.put("1", new MyList("2", "3", "4"));
    edges.put("2", new MyList("5", "6", "7"));
    edges.put("3", new MyList("8", "9", "10"));
    edges.put("8", new MyList("11", "12"));
    edges.put("5", new MyList("13", "14", "15"));
    edges.put("7", new MyList("16", "17", "18"));
    edges.put("6", new MyList("19", "20"));
}
Run Code Online (Sandbox Code Playgroud)

最后,让我们检查一下您的列表中实际上在不同的限制值上请求了多少个元素:

public static void main(String[] args) {
    for(int i=1; i<=20; i++) {
        Tree tree = new Tree();
        tree.descendants("1").limit(i).toArray();
        System.out.println("Limit = " + i + "; requested = (" + tree.requested.size()
                + ") " + tree.requested);
    }
}
Run Code Online (Sandbox Code Playgroud)

输出如下:

Limit = 1; requested = (0) []
Limit = 2; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 3; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 4; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 5; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 6; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 7; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 8; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 9; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 10; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 11; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 12; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 13; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 14; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 15; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 16; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 17; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 18; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 19; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 20; requested = (19) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10, 4]
Run Code Online (Sandbox Code Playgroud)

因此,当仅请求根注释时,不执行对子项的访问(这Stream.concat很聪明)。当请求第一个直属子级时,即使不需要该子级的整个子树,也将对其进行处理。但是,直到第一个孩子完成后,第二个直属孩子才会得到处理。这对于短路情况可能是有问题的,但是在大多数情况下,您的终端操作不是短路的,因此这仍然是一种很好的方法。

关于您对内存消耗的担忧:是的,它根据树的深度占用了内存(更重要的是,它占用了堆栈)。如果您的树有数千个嵌套级别,则可能会遇到解决方案的问题,因为您可能StackOverflowError需要使用默认-Xss设置。对于数百个深度级别,它都可以正常工作。

我们在应用程序的业务逻辑层中使用了类似的方法,尽管我们的树很少超过10个层次,但它对我们来说很好用。