流处理同时依赖上一个和下一个元素

Jur*_*uru 5 java lambda java-8 java-stream

我必须处理一个包含预定义记录布局的固定宽度文件,存在多种类型的记录,并且记录的第一个字符决定了它的类型。因为它是固定宽度,所以在一行中并不总是可以容纳整个记录类型,所以第二个字符是记录的序列号。例如:

0This is the header record------------------------------------
1This is another record always existing out of one lin--------
21This is a record that can be composed out of multiple parts.
22This is the second part of record type 2--------------------
21This is a new record of type 2, first part.-----------------
22This is the second part of record type 2--------------------
23This is the third part of record type 2---------------------
...
Run Code Online (Sandbox Code Playgroud)

使用 Stream API,我想解析这个文件:

Stream<String> lines = Files.lines(Paths.get(args[1]));

lines.map(line -> RecordFactory.createRecord(line)).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

但是由于这个流是逐行传递的,当它解析记录类型 2 的第一行(记录类型 2 序列 1)时,记录 2 的映射是不完整的。下一行(记录类型 2 序列 2)应添加到前一个映射的结果中。

如何用 lambda 解决这个问题而不必破坏线程安全?

Tun*_*aki 4

当前使用 Stream API 不容易实现对与谓词匹配的连续元素进行操作。

一种选择是使用提供以下操作的StreamExgroupRuns库:

返回一个由该流的元素列表组成的流,其中相邻元素根据提供的谓词进行分组。

以下代码将连续行的记录部分编号严格大于前一行的记录部分编号的行组合在一起。使用正则表达式提取记录号,该正则表达式查找第一个忽略的数字之后的所有数字。

private static final Pattern PATTERN = Pattern.compile("\\d(\\d+)");

public static void main(String[] args) throws IOException {
    try (StreamEx<String> stream = StreamEx.ofLines(Paths.get("..."))) {
        List<Record> records =
            stream.groupRuns((s1, s2) -> getRecordPart(s2) > getRecordPart(s1))
                  .map(RecordFactory::createRecord)
                  .toList();
    }
}

private static final int getRecordPart(String str) {
    Matcher matcher = PATTERN.matcher(str);
    if (matcher.find()) {
        return Integer.parseInt(matcher.group(1));
    }
    return 1; // if the pattern didn't find anything, it means the record is on a single line
}
Run Code Online (Sandbox Code Playgroud)

这假设您将从a而不是从 aRecordFactory创建a 。请注意,此解决方案可以并行运行,但如果您想要更好的并行性能(以内存为代价),将文件的内容存储到列表中并进行后处理可能会更好。RecordList<String>StringList

  • 如果有一个正确编写的收集器,它接受形成单个记录的字符串流并生成结果记录,那么也可以使用“StreamEx.collapse(BiPredicate, Collector)”。根据情况,这可能比生成中间“列表”更优化。例如,如果您想将单个记录的行连接在一起,您可以使用 StreamEx.collapse(BiPredicate, Collectors.joining("\n"))`。 (2认同)
  • 实际上 `StreamEx.groupRuns(BiPredicate)` 与 `StreamEx.collapse(BiPredicate, Collectors.toList())` 相同,尽管它在内部进行了一些优化。 (2认同)