Java flatmap Iterator <Pair <Stream <A>,Stream <B >>>对<Stream <A>,Stream <B >>

Msh*_*nik 14 java java-8 java-stream

我正在尝试使用以下签名实现一个方法:

public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator);
Run Code Online (Sandbox Code Playgroud)

方法的目标是将每个流类型展平为单个流并将输出包装成一对.我只有一个迭代器(不是Iterable),我不能改变方法签名,所以我必须在一次迭代中执行展平.

我目前最好的实施是

public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>> iterator) {
    Stream<A> aStream = Stream.empty();
    Stream<B> bStream = Stream.empty();
    while(iterator.hasNext()) {
        Pair<Stream<A>, Stream<B>> elm = iterator.next();
        aStream = Stream.concat(aStream, elm.first);
        bStream = Stream.concat(bStream, elm.second);
    }
    return Pair.of(aStream, bStream);
}
Run Code Online (Sandbox Code Playgroud)

虽然这在技术上是正确的,但我对此并不十分满意,原因有两个:

  1. Stream.concat警告不要这样做,因为它可能导致StackOverflowError.
  2. 在风格上,我宁愿它是纯粹的功能,如果可能的话,而不是必须遍历迭代器并重新分配流.

感觉Stream#flatMap应该适合这里(在使用Guava的Streams.stream(Iterator)将输入Iterator转换为Stream之后,但由于中间的Pair类型,它似乎不起作用.

另外一个要求是任何迭代器/流可能非常大(例如,输入可能包含从一对极大的流到一个项目流中的许多流的任何地方),因此理想情况下,解决方案不应包含收集结果. - 记忆收藏.

Eug*_*ene 10

那番石榴Streams.stream没有魔力,它实际上只是内部:

StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
Run Code Online (Sandbox Code Playgroud)

因此,在您可以直接使用它时,可能无需将其链接到您的方法.

你可以Stream.Builder只使用它:

public static <A, B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {

    Stream.Builder<Stream<A>> builderA = Stream.builder();
    Stream.Builder<Stream<B>> builderB = Stream.builder();

    iterator.forEachRemaining(pair -> {
        builderA.add(pair.first);
        builderB.add(pair.second);
    });

    return Pair.of(builderA.build().flatMap(Function.identity()), builderB.build().flatMap(Function.identity()));
}
Run Code Online (Sandbox Code Playgroud)


Did*_*r L 7

避免收集整体Iterator(就像你在问题中实际做的那样)是非常困难的,因为你不知道如何消耗产生的流:一个可能被完全消耗,需要完全消耗迭代器,而另一个是根本没有消耗,需要跟踪所有产生的对 - 有效地将它们收集到某个地方.

只有当流以"速度"或多或少地消耗时,您才可以从不收集整个迭代器中受益.但是这种消耗意味着要么使用其中一个结果流的迭代器,要么在并行线程中使用流 - 这需要额外的同步.

因此,我建议将所有对收集到一个List,然后Pair从该列表中生成新的:

public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
    Iterable<Pair<Stream<A>, Stream<B>>> iterable = () -> iterator;
    final List<Pair<Stream<A>, Stream<B>>> allPairs =
        StreamSupport.stream(iterable.spliterator(), false)
            .collect(Collectors.toList());

    return Pair.of(
            allPairs.stream().flatMap(p -> p.first),
            allPairs.stream().flatMap(p -> p.second)
    );
}
Run Code Online (Sandbox Code Playgroud)

这还不消耗任何原始流,同时保留一个避免嵌套流连接的简单解决方案.