And*_*ang 80 java batch-processing java-8 java-stream
我有一个包含项目列表的大文件.
我想创建一批项目,使用此批处理发出HTTP请求(所有项目都需要作为HTTP请求中的参数).我可以通过for循环很容易地完成它,但作为Java 8的爱好者,我想尝试用Java 8的Stream框架编写它(并获得延迟处理的好处).
例:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
我想做一些很长的路要走
lazyFileStream.group(500).map(processBatch).collect(toList())
最好的方法是什么?
Ben*_*nes 103
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
在问题集合是可用的,所以不需要流,它可以写成,
Iterables.partition(data, batchSize).forEach(this::process);
Tag*_*eev 48
纯Java-8实现也是可能的:
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));
请注意,与JOOl不同,它可以很好地并行工作(假设您data是随机访问列表).
roh*_*ats 31
纯Java 8解决方案:
我们可以创建一个自定义收集器来优雅地执行此操作,这需要a batch size和a Consumer处理每个批处理:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.Objects.requireNonNull;
/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;
    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);
        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }
    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }
    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }
    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }
    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }
    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}
(可选)然后创建一个辅助实用程序类:
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;
public class StreamUtils {
    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}
用法示例:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));
我已经在GitHub上发布了我的代码,如果有人想看看:
Bru*_*ton 14
我为这样的场景编写了一个自定义Spliterator.它将从输入流中填充给定大小的列表.这种方法的优点是它将执行延迟处理,并且它将与其他流函数一起使用.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}
private static class BatchSpliterator<E> implements Spliterator<List<E>> {
    private final Spliterator<E> base;
    private final int batchSize;
    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }
    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }
    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }
    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }
    @Override
    public int characteristics() {
        return base.characteristics();
    }
}
Luk*_*der 11
您可以使用jOOλ来实现它,这是一个扩展Java 8流的库,用于单线程,顺序流用例:
Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });
在幕后,zipWithIndex()只是:
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();
    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;
        @Override
        public boolean hasNext() {
            return it.hasNext();
        }
        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }
    return seq(new ZipWithIndex());
}
...而groupBy()API方便:
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}
(免责声明:我为jOOλ背后的公司工作)
我们遇到了类似的问题需要解决.我们想要获取一个比系统内存更大的流(迭代数据库中的所有对象)并尽可能地随机化顺序 - 我们认为可以缓冲10,000个项目并随机化它们.
目标是一个接收流的功能.
在这里提出的解决方案中,似乎有一系列选择:
我们的直觉最初是使用自定义收集器,但这意味着退出流式传输.上面的定制收集器解决方案非常好,我们几乎使用它.
这是一个解决方案,通过使用Streams可以给你一个Iterator你可以用作逃生舱的事实作弊,让你做一些额外的流不支持.Iterator使用另一个Java 8 StreamSupport巫术将其转换回流.
/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }
    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }
    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;
    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }
    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }
    @Override
    public List<T> next() {
        return currentBatch;
    }
    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}
使用它的一个简单示例如下所示:
@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}
以上打印
[A, B, C]
[D, E, F]
对于我们的用例,我们想要对批次进行洗牌,然后将它们保存为流 - 它看起来像这样:
@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}
这会输出类似的东西(它是随机的,每次都不同)
A
C
B
E
D
F
这里的秘诀是总是有一个流,所以你可以在一批批次上操作,或者对每个批次做一些事情,然后再flatMap回到一个流.更好的是,所有上述的只运行作为最终forEach或collect或其他终止表达式PULL通过流中的数据.
事实证明,这iterator是一种特殊类型的流终止操作,并不会导致整个流运行并进入内存!感谢Java 8人的精彩设计!
您也可以使用RxJava:
Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));
要么
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
要么
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
你也可以看一下cyclops-react,我是这个库的作者.它实现了jOOλ接口(以及扩展的JDK 8 Streams),但与JDK 8 Parallel Streams不同,它专注于Asyncrhonous操作(例如可能阻止异步I/O调用).相比之下,JDK Parallel Streams专注于CPU绑定操作的数据并行性.它的工作原理是管理基于Future的任务的聚合,但为最终用户提供标准的扩展Stream API.
此示例代码可以帮助您入门
LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();
这里有一个更通用的教程
要使用您自己的线程池(可能更适合阻止I/O),您可以开始使用
     LazyReact reactor = new LazyReact(40);
     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();
| 归档时间: | 
 | 
| 查看次数: | 44943 次 | 
| 最近记录: |