如何正确地将流减少到另一个流

rus*_*tot 4 java java-8 java-stream

我有字符串和null的流

Stream<String> str1 = Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);
Run Code Online (Sandbox Code Playgroud)

我想将它减少到另一个流,其中任何非空字符串序列连接在一起,即喜欢

Stream<String> str2 = Stream.of("ABC", "", "D", "EF","G")
Run Code Online (Sandbox Code Playgroud)

第一种方式,我发现 - 创建收集器,首先将完整的输入流减少到单个对象,并列出所有连接的字符串,然后从中创建新的流:

class Acc1 {
  final private List<String> data = new ArrayList<>();
  final private StringBuilder sb = new StringBuilder();

  private void accept(final String s) {
    if (s != null) 
      sb.append(s);
    else {
      data.add(sb.toString());
      sb.setLength(0);
    }
  }

  public static Collector<String,Acc1,Stream<String>> collector() {
    return Collector.of(Acc1::new, Acc1::accept, (a,b)-> a, acc -> acc.data.stream());
  }
}
...
Stream<String> str2 = str.collect(Acc1.collector());
Run Code Online (Sandbox Code Playgroud)

但在这种情况下,如果使用str2,即使作为str2.findFirst(),输入流也将被完全处理.消耗时间和内存的操作以及来自某些生成器的无限流,它根本不起作用

另一种方法 - 创建外部对象,保持中间状态并在flatMap()中使用它:

class Acc2 {
  final private StringBuilder sb = new StringBuilder();

  Stream<String> accept(final String s) {
    if (s != null) {
      sb.append(s);
      return Stream.empty();
    } else {
      final String result = sb.toString();
      sb.setLength(0);
      return Stream.of(result);
    }
  }
}
...
Acc2 acc = new Acc2();
Stream<String> str2 = str1.flatMap(acc::accept);
Run Code Online (Sandbox Code Playgroud)

在这种情况下,从str1将只检索通过str2真正访问的elemets.

但是使用在流处理之外创建的外部对象对我来说看起来很难看,并且可能会导致一些副作用,我现在还没有看到.此外,如果str2稍后将与parallelStream()一起使用,则会导致不可预测的结果.

如果没有这些缺陷,是否还有更正确的stream-> stream reduction的实现?

Hol*_*ger 5

减少或其可变变体,collect始终是将处理所有项目的操作.您的操作可以通过自定义实现Spliterator,例如

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private StringBuilder sb = new StringBuilder();
            private String last;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> last=str))
                    return false;
                while(last!=null) {
                    sb.append(last);
                    if(!sp.tryAdvance(str -> last=str)) break;
                }
                action.accept(sb.toString());
                sb=new StringBuilder();
                return true;
            }
        }, false);
}
Run Code Online (Sandbox Code Playgroud)

它可以生成预期的组,您可以使用它进行测试

joinGroups(Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null))
    .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

但也有所需的懒惰行为,可测试通过

joinGroups(
    Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null)
          .peek(str -> System.out.println("consumed "+str))
).skip(1).filter(s->!s.isEmpty()).findFirst().ifPresent(System.out::println);
Run Code Online (Sandbox Code Playgroud)

经过第二次思考,我来到了这个稍微有效的变体.StringBuilder如果至少有两个Strings要加入,它将包含唯一的,否则,它将只使用已存在的唯一String实例或""空字符串的文字字符串:

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private String next;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> next=str))
                    return false;
                String string=next;
                if(string==null) string="";
                else if(sp.tryAdvance(str -> next=str) && next!=null) {
                    StringBuilder sb=new StringBuilder().append(string);
                    do sb.append(next);while(sp.tryAdvance(str -> next=str) && next!=null);
                    string=sb.toString();
                }
                action.accept(string);
                return true;
            }
        }, false);
}
Run Code Online (Sandbox Code Playgroud)


Tag*_*eev 5

使用标准Stream API实现此类方案非常困难.在我的免费StreamEx库中,我使用允许执行所谓的"部分缩减"的方法扩展了标准Stream接口,这正是这里所必需的:

StreamEx<String> str1 = StreamEx.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);
Stream<String> str2 = str1.collapse((a, b) -> a != null,
                          MoreCollectors.filtering(Objects::nonNull, Collectors.joining()));
str2.map(x -> '"'+x+'"').forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

输出:

"ABC"
""
"D"
"EF"
"G"
Run Code Online (Sandbox Code Playgroud)

StreamEx.collapse()方法使用所提供的收集器执行流的部分减少.第一个参数是一个谓词,它应用于两个相邻的原始项,如果它们必须一起缩小,则应该返回true.这里我们只要求对中的第一个不是null((a, b) -> a != null):这意味着每个组都以此结束,null并且新组在此处开始.现在我们需要将组合字母连接起来:这可以通过标准Collectors.joining()收集器完成.但是我们也需要过滤掉null.我们可以使用MoreCollectors.filteringcollector来实现(实际上,同样的收集器将在Collectors类的Java 9中提供).

这种实现完全是懒惰的,并且对并行处理非常友好.