iLi*_*ast 3 google-cloud-dataflow
数据流管道运行完成后是否可以在 GCS 中移动文件?如果是这样,怎么办?应该是最后一个吧.apply?我无法想象事情会是这样。
这里的情况是我们从客户端导入大量 .csv 文件。我们需要无限期地保留这些 CSV,因此我们要么需要“将 CSV 标记为已处理”,要么将它们移出用于TextIO查找 CSV 的初始文件夹。我目前唯一能想到的就是在 BigQuery 中存储文件名(我不确定如何得到这个,我是 DF 新手),然后从执行中排除已经存储的文件管道不知何故?但必须有更好的方法。
这可能吗?我应该检查什么?
谢谢你的帮助!
之后您可以尝试BlockingDataflowPipelineRunner在主程序中使用并运行任意逻辑p.run()(它将等待管道完成)。
请参阅指定执行参数,特别是“阻止执行”部分。
然而,一般来说,您似乎确实需要一个连续运行的管道来监视包含 CSV 文件的目录并在新文件出现时导入它们,而不会两次导入同一文件。这对于流管道来说是一个很好的例子:您可以编写一个自定义的UnboundedSource(另请参阅自定义源和接收器)来监视目录并返回其中的文件名(即T可能是Stringor GcsPath):
p.apply(Read.from(new DirectoryWatcherSource(directory)))
.apply(ParDo.of(new ReadCSVFileByName()))
.apply(the rest of your pipeline)
Run Code Online (Sandbox Code Playgroud)
DirectoryWatcherSource你的,UnboundedSource也是ReadCSVFileByName你需要编写的一个转换,它接受一个文件路径并将其读取为 CSV 文件,返回其中的记录(不幸的是,现在你不能像TextIO.Read在管道中间那样使用转换,只能一开始 - 我们正在努力解决这个问题)。
这可能有点棘手,正如我所说,我们正在开发一些功能,使其变得更加简单,我们正在考虑创建一个类似的内置源,但目前这可能仍然比“弹球工作”。请尝试一下,dataflow-feedback@google.com如果有任何不清楚的地方请告诉我们!
同时,您还可以在Cloud Bigtable中存储有关您已处理或尚未处理的文件的信息- 它比 BigQuery 更适合这一点,因为它更适合随机写入和查找,而 BigQuery 更适合大型数据库对整个数据集进行批量写入和查询。
| 归档时间: |
|
| 查看次数: |
1751 次 |
| 最近记录: |