是否有一种优雅的方式来处理块中的流?

Boh*_*ian 40 java chunking java-8 java-stream

我的确切方案是批量插入数据库,所以我想累积DOM对象然后每1000个,刷新它们.

我通过将代码放入累加器来检测丰满度然后刷新来实现它,但这似乎是错误的 - 刷新控件应该来自调用者.

我可以将流转换为List然后以迭代方式使用subList,但这似乎也很笨拙.

有一个简洁的方法来处理每n个元素然后继续流,而只处理流一次?

Mis*_*sha 12

优雅是旁观者的眼睛.如果你不介意使用有状态函数groupingBy,你可以这样做:

AtomicInteger counter = new AtomicInteger();

stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))
    .values()
    .forEach(database::flushChunk);
Run Code Online (Sandbox Code Playgroud)

与原始解决方案相比,这不会赢得任何性能或内存使用点,因为它在执行任何操作之前仍会实现整个流.

如果您想避免实现列表,流API将无法帮助您.你必须得到流的迭代器或分裂器,并做这样的事情:

Spliterator<Integer> split = stream.spliterator();
int chunkSize = 1000;

while(true) {
    List<Integer> chunk = new ArrayList<>(size);
    for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};
    if (chunk.isEmpty()) break;
    database.flushChunk(chunk);
}
Run Code Online (Sandbox Code Playgroud)


dmi*_*vim 12

上面的大多数答案都没有使用像节省内存这样的流好处。可以尝试使用迭代器解决问题

Stream<List<T>> chunk(Stream<T> stream, int size) {
  Iterator<T> iterator = stream.iterator();
  Iterator<List<T>> listIterator = new Iterator<>() {

    public boolean hasNext() {
      return iterator.hasNext();
    }

    public List<T> next() {
      List<T> result = new ArrayList<>(size);
      for (int i = 0; i < size && iterator.hasNext(); i++) {
        result.add(iterator.next());
      }
      return result;
    }
  };
  return StreamSupport.stream(((Iterable<List<T>>) () -> listIterator).spliterator(), false);
}
Run Code Online (Sandbox Code Playgroud)

  • 非常好的解决方案,+1。只有一项改进:您可能希望将流返回为“return StreamSupport.stream(Spliterators.spliteratorUnknownSize(listIterator, Spliterator.ORDERED), false);”。 (2认同)

use*_*648 7

如果您对项目有番石榴依赖,则可以执行以下操作:

StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...);
Run Code Online (Sandbox Code Playgroud)

参见https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-

  • 此解决方案拆分列表而不是流。有用,但不是@Bohemian询问的内容。 (3认同)

Naz*_*iuk 6

使用库StreamEx解决方案看起来像

Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15);
AtomicInteger counter = new AtomicInteger(0);
int chunkSize = 4;

StreamEx.of(stream)
        .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0)
        .forEach(chunk -> System.out.println(chunk));
Run Code Online (Sandbox Code Playgroud)

输出:

[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14]
Run Code Online (Sandbox Code Playgroud)

groupRuns 接受谓词,决定2个元素是否应该在同一个组中.

一旦找到不属于它的第一个元素,它就会生成一个组.


Vas*_*sky 6

这是简单的包装 spliterator 实现,它将源元素分组为块:

public class ChunkedSpliterator<T> implements Spliterator<List<T>> {
    private static final int PROMOTED_CHARACTERISTICS = Spliterator.ORDERED | Spliterator.DISTINCT | Spliterator.SIZED | Spliterator.IMMUTABLE | Spliterator.CONCURRENT;
    private static final int SELF_CHARACTERISTICS = Spliterator.NONNULL;

    private final Spliterator<T> src;
    private final int chunkSize;

    public ChunkedSpliterator(Spliterator<T> src, int chunkSize) {
        if (chunkSize < 1)
            throw new IllegalArgumentException("chunkSize must be at least 1");
        this.src = src;
        this.chunkSize = chunkSize;
    }

    public static <E> Stream<List<E>> chunkify(Stream<E> src, int chunkSize) {
        ChunkedSpliterator<E> wrap = new ChunkedSpliterator<>(src.spliterator(), chunkSize);
        return StreamSupport.stream(wrap, src.isParallel());
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        List<T> result = new ArrayList<>((int) Math.min(src.estimateSize(), chunkSize));
        for (int i = 0; i < chunkSize; ++i) {
            if (!src.tryAdvance(result::add))
                break;
        }
        if (result.isEmpty())
            return false;
        action.accept(result);
        return true;
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        Spliterator<T> srcSplit = src.trySplit();
        return srcSplit == null ? null : new ChunkedSpliterator<>(srcSplit, chunkSize);
    }

    @Override
    public long estimateSize() {
        long srcSize = src.estimateSize();
        if (srcSize <= 0L) return 0L;
        if (srcSize == Long.MAX_VALUE) return Long.MAX_VALUE;
        return (srcSize - 1) / chunkSize + 1;
    }

    @Override
    public int characteristics() {
        return (src.characteristics() & PROMOTED_CHARACTERISTICS) | SELF_CHARACTERISTICS;
    }
}
Run Code Online (Sandbox Code Playgroud)

有一个方便的chunkify快捷方法可以使事情变得更容易:

    Stream<T> input = ...;
    Stream<List<T>> chunked = ChunkedSpliterator.chunkify(input, 1000);
Run Code Online (Sandbox Code Playgroud)

尽管调用Stream.spliterator()是终端操作,但它实际上并没有强制耗尽流的源。因此,它可以通过其 spliterator 逐渐进行处理,而无需获取内存中的所有数据 - 仅针对每个块。

该分离器保留了输入的大部分特征。但是,它不是子大小的(块可能会在中间分割),也不会排序(即使元素可排序,如何对块进行排序也不明显)并且仅生成非空块(尽管块仍然可能具有空元素)。我对并发/不可变不是100%确定,但似乎它应该毫无问题地继承这些。此外,生成的块可能不严格符合请求的大小,但绝不会超过它。

事实上,我很惊讶这样一个流行的问题在近7(!)年里没有引入自定义 spliterator 的答案。


Pet*_*ser 5

您可以创建一个数据块的数据流List<T>一)的项目流和给定块大小

  • 通过块索引(元素索引/块大小)对项目进行分组
  • 按其索引对块进行排序
  • 将地图缩小为仅其有序元素

码:

public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) {
    AtomicInteger index = new AtomicInteger(0);

    return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize))
            .entrySet().stream()
            .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue);
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf);
Stream<List<Integer>> chunked = chunked(stream, 8);
chunked.forEach(chunk -> System.out.println("Chunk: " + chunk));
Run Code Online (Sandbox Code Playgroud)

输出:

Chunk: [0, 1, 2, 3, 4, 5, 6, 7]
Chunk: [8, 9, 10, 11, 12, 13, 14, 15]
Chunk: [16, 17, 18, 19, 20, 21, 22, 23]
Chunk: [24, 25, 26, 27, 28, 29, 30, 31]
Chunk: [32, 33, 34, 35, 36, 37, 38, 39]
Chunk: [40, 41, 42, 43, 44, 45, 46, 47]
Chunk: [48, 49, 50, 51, 52, 53, 54, 55]
Chunk: [56, 57, 58, 59, 60, 61, 62, 63]
Chunk: [64, 65, 66, 67, 68, 69, 70, 71]
Chunk: [72, 73, 74, 75, 76, 77, 78, 79]
Chunk: [80, 81, 82, 83, 84, 85, 86, 87]
Chunk: [88, 89, 90, 91, 92, 93, 94, 95]
Chunk: [96, 97, 98, 99]
Run Code Online (Sandbox Code Playgroud)

  • 该解决方案将在处理块之前将完整的流读入映射,而不是在“中流”生成块。这可能不是您想要/期望的,特别是对于大型流,这可能是分块处理的最大用例。 (3认同)