Java Stream:将用户输入转换为批处理

Yuv*_*val 4 lambda java-8 java-stream

我想流式传输(来自用户输入)并输出特定长度的列表流 - 有效地将用户的输入批量批处理,然后对其进行一些其他工作.所以基本上,对于用户输入:1,2,3,4,5,6,7,8,9我可以将它拆分为这些批次 <1,2,3> , <4,5,6> , <7,8,9>一旦我收集了3个数字,我想创建一个列表,供下一个处理步骤使用.我试图在java 8中使用lamda和stream操作来实现这一点,以便了解更多信息.

我能找到的唯一相关样本是http://www.nurkiewicz.com/2014/07/grouping-sampling-and-batching-custom.html与自定义收集器,它做了一些非常类似于我想要的东西 - 问题使用收集器是因为我不想等待流的结束,而是在准备好时处理每个批处理.

有一些简单的方法吗?它是否不适合使用Java 8流进行此类操作?

Tag*_*eev 5

一般来说,使用Java-8流API解决问题是有问题的.首先,将流拆分为固定大小的批处理对于并行流是不可能的,因为源可能将任务划分为某个未知偏移量,因此在实际处理所有先前的子任务之前,通常不能知道当前流元素的索引(这会杀死整个并行化的想法).由于Stream API的想法是在并行和顺序模式下以相同的方式工作,因此它根本没有方法将流元素组合成均匀的批处理.有一些第三方解决方案通常会忽略并行流的存在(如protonpack StreamUtils.windowed),但一般来说,从一开始就生成批次而不是转换流更清晰.

第二个问题是Java-8中没有现成的工具来从标准输入获取数字(或至少是令牌)作为Stream(你只能使用线路BufferedReader.lines()).在Java-9中会有所改进,因为流支持被添加到Scanner类中(参见JDK-8072722),但是目前你需要做一些额外的步骤.

最后,如果您设法创建数字批次流,则需要在标准输入完成后立即完成.这是takeWhile操作的工作,它也只出现在Java-9中(参见JDK-8071597).

我可以负担得起涉及我的StreamEx库的解决方案,尽管我仍然不喜欢它:

Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Iterable<String> iterable = () -> sc;
// Supplier which returns Lists of up to 3 numbers from System.in
Supplier<List<Integer>> triples = () -> StreamEx.of(iterable.spliterator())
        .map(Integer::valueOf).limit(3).toList();
StreamEx.generate(triples).takeWhile(list -> !list.isEmpty())
        // replace with your own stream operations
        // they will be executed as soon as three numbers are entered
        .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

这里使用的StreamEx的主要特征StreamEx.takeWhile是Java-9 的后端Stream.takeWhile.

如果您更喜欢使用jOOL,它会更简单:

Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Supplier<List<Integer>> triples = () -> Seq.seq(sc).map(Integer::valueOf).limit(3).toList();
Seq.generate(triples).limitUntil(List::isEmpty)
    .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

这个想法是一样的.Spliterator这里不需要创建,因为jOOL有Seq.seq(Iterator)方法.

最后这里是protonpack解决方案.我个人不喜欢这个库,但解决方案看起来很短,所以有人可能更喜欢它:

import static com.codepoetics.protonpack.StreamUtils.*;

Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Stream<List<Integer>> stream = takeUntil(windowed(
    stream(() -> sc).map(Integer::valueOf), 3, 3), List::isEmpty);
stream.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

这里的问题是,由于某种原因,它会延迟批处理,直到形成下一批.如果它具有少于3个元素,它也不会创建最终批次.此问题已在trunc中修复,但尚未发布.