在Apache Beam中查看与文件模式匹配的新文件

jkf*_*kff 5 google-cloud-dataflow apache-beam

我在GCS或其他受支持的文件系统上有一个目录,外部进程正在向该文件系统写入新文件.

我想写一个Apache Beam流管道,它不断地在这个目录中查看新文件,并在每个新文件到达时读取和处理它们.这可能吗?

jkf*_*kff 6

这可以从Apache Beam 2.2.0开始.几个API支持此用例:

如果你正在使用TextIOAvroIO,他们明确地通过TextIO.read().watchForNewFiles()和支持这个readAll(),例如:

PCollection<String> lines = p.apply(TextIO.read()
    .from("gs://path/to/files/*")
    .watchForNewFiles(
        // Check for new files every 30 seconds
        Duration.standardSeconds(30),
        // Never stop checking for new files
        Watch.Growth.<String>never()));
Run Code Online (Sandbox Code Playgroud)

如果您使用的是其他文件格式,则可以使用FileIO.match().continuously()FileIO.matchAll().continuously()支持相同的API FileIO.readMatches().

API支持指定检查新文件的频率,以及何时停止检查(支持的条件是例如"如果在给定时间内没有出现新输出","在观察N个输出之后","在开始检查后的给定时间之后) "和他们的组合).

请注意,此功能目前仅适用于Direct runner和Dataflow runner,仅适用于Java SDK.通常,它适用于任何支持Splittable DoFn的运行器(参见功能矩阵).