通过鉴别器功能对流进行分区

Mar*_*nik 19 java java-8 java-stream

Streams API中缺少的一个功能是"分区依据"转换,例如Clojure中定义的.假设我想重现Hibernate的fetch join:我想发出一个SQL SELECT语句来从结果中接收这种对象:

class Family {
   String surname;
   List<String> members;
}
Run Code Online (Sandbox Code Playgroud)

我发出:

SELECT f.name, m.name 
FROM Family f JOIN Member m on m.family_id = f.id
ORDER BY f.name
Run Code Online (Sandbox Code Playgroud)

我检索一个平坦的(f.name, m.name)记录流.现在我需要将其转换为Family对象流,并在其中包含其成员列表.假设我已经有了Stream<ResultRow>; 现在我需要将其转换为a Stream<List<ResultRow>>然后使用映射转换对其进行操作,将其转换为a Stream<Family>.

转换的语义如下:List只要提供的鉴别器函数保持返回相同的值,就保持将流收集到for中; 一旦值改变,发出List作为输出流的元素并开始收集新的List.

我希望能够编写这种代码(我已经有了这个resultStream方法):

Stream<ResultRow> dbStream = resultStream(queryBuilder.createQuery(
        "SELECT f.name, m.name"
      + " FROM Family f JOIN Member m on m.family_id = f.id"
      + " ORDER BY f.name"));
Stream<List<ResultRow> partitioned = partitionBy(r -> r.string(0), dbStream);
Stream<Family> = partitioned.map(rs -> {
                    Family f = new Family(rs.get(0).string(0));
                    f.members = rs.stream().map(r -> r.string(1)).collect(toList());
                    return f;
                 });
Run Code Online (Sandbox Code Playgroud)

不用说,我希望结果流保持延迟(非物化),因为我希望能够处理任何大小的结果集而不会遇到任何O(n)内存限制.如果没有这个关键要求,我会对所提供的groupingBy收藏家感到满意.

Mar*_*nik 13

该解决方案要求我们定义Spliterator可用于构造分区流的自定义.我们需要通过自己的spliterator访问输入流并将其包装到我们的.然后从我们的自定义分割器构造输出流.

以下Spliterator将把任何一个Stream<E>转换为Stream<List<E>>提供的a Function<E, ?>作为鉴别器功能.请注意,必须为此操作订购输入流才有意义.

public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> {
  private final Spliterator<E> spliterator;
  private final Function<? super E, ?> partitionBy;
  private HoldingConsumer<E> holder;
  private Comparator<List<E>> comparator;

  public PartitionBySpliterator(Spliterator<E> toWrap, Function<? super E, ?> partitionBy) {
    super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL);
    this.spliterator = toWrap;
    this.partitionBy = partitionBy;
  }

  public static <E> Stream<List<E>> partitionBy(Function<E, ?> partitionBy, Stream<E> in) {
    return StreamSupport.stream(new PartitionBySpliterator<>(in.spliterator(), partitionBy), false);
  }

  @Override public boolean tryAdvance(Consumer<? super List<E>> action) {
    final HoldingConsumer<E> h;
    if (holder == null) {
      h = new HoldingConsumer<>();
      if (!spliterator.tryAdvance(h)) return false;
      holder = h;
    }
    else h = holder;
    final ArrayList<E> partition = new ArrayList<>();
    final Object partitionKey = partitionBy.apply(h.value);
    boolean didAdvance;
    do partition.add(h.value);
    while ((didAdvance = spliterator.tryAdvance(h))
        && Objects.equals(partitionBy.apply(h.value), partitionKey));
    if (!didAdvance) holder = null;
    action.accept(partition);
    return true;
  }

  static final class HoldingConsumer<T> implements Consumer<T> {
    T value;
    @Override public void accept(T value) { this.value = value; }
  }

  @Override public Comparator<? super List<E>> getComparator() {
    final Comparator<List<E>> c = this.comparator;
    return c != null? c : (this.comparator = comparator());
  }

  private Comparator<List<E>> comparator() {
    @SuppressWarnings({"unchecked","rawtypes"})
    final Comparator<? super E> innerComparator =
        Optional.ofNullable(spliterator.getComparator())
                .orElse((Comparator) naturalOrder());
    return (left, right) -> {
      final int c = innerComparator.compare(left.get(0), right.get(0));
      return c != 0? c : innerComparator.compare(
          left.get(left.size() - 1), right.get(right.size() - 1));
    };
  }
}
Run Code Online (Sandbox Code Playgroud)

  • 好吧,既然我是如何构建它的,那么我发现它也更常见.如果我发出一个流并且客户端只需要列表,我会强迫他复制到一个新列表中.看看我的用例:我需要对第一个list元素进行特殊处理,在进入所有元素的循环之前我会获取它.如果我有一个流,那将会非常*尴尬. (3认同)
  • 这会打破内部流的语义:如果你发出`partitionedStream.skip(1)...`怎么办?一般而言,访问流的任何元素都需要其所有前任的完全消费(和保留!). (2认同)
  • @annesadleir你试过吗?我期待两个错误:`didAdvance`的返回值在最后一次推进时会出错,并且每次后续调用`tryAdvance()`都会因NPE而失败.后者实际上取决于分区函数,但由于它可以采用非null参数,因此在传递"null"时极有可能失败. (2认同)