jkf*_*kff 5 google-cloud-dataflow apache-beam
我在GCS或其他受支持的文件系统上有一个目录,外部进程正在向该文件系统写入新文件.
我想写一个Apache Beam流管道,它不断地在这个目录中查看新文件,并在每个新文件到达时读取和处理它们.这可能吗?
这可以从Apache Beam 2.2.0开始.几个API支持此用例:
如果你正在使用TextIO或AvroIO,他们明确地通过TextIO.read().watchForNewFiles()和支持这个readAll(),例如:
PCollection<String> lines = p.apply(TextIO.read()
.from("gs://path/to/files/*")
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.<String>never()));
Run Code Online (Sandbox Code Playgroud)
如果您使用的是其他文件格式,则可以使用FileIO.match().continuously()和FileIO.matchAll().continuously()支持相同的API FileIO.readMatches().
API支持指定检查新文件的频率,以及何时停止检查(支持的条件是例如"如果在给定时间内没有出现新输出","在观察N个输出之后","在开始检查后的给定时间之后) "和他们的组合).
请注意,此功能目前仅适用于Direct runner和Dataflow runner,仅适用于Java SDK.通常,它适用于任何支持Splittable DoFn的运行器(参见功能矩阵).
| 归档时间: |
|
| 查看次数: |
2978 次 |
| 最近记录: |