在 Apache Beam / Dataflow Python 流中写入文本文件

Dan*_*ias 6 google-cloud-storage google-cloud-pubsub google-cloud-dataflow apache-beam

我有一个非常基本的 Python Dataflow 作业,它从 Pub/Sub 读取一些数据,应用 FixedWindow 并写入 Google Cloud Storage。

transformed = ...
transformed | beam.io.WriteToText(known_args.output)
Run Code Online (Sandbox Code Playgroud)

输出写入--output中特定的位置,但只是临时阶段,即

gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...
Run Code Online (Sandbox Code Playgroud)

该文件永远不会使用分片模板放入正确命名的位置。

在本地和 DataFlow 运行器上测试。


在进一步测试时,我注意到 streaming_wordcount 示例有相同的问题,但是标准 wordcount 示例写得很好。也许问题在于窗口,或从 pubsub 阅读?


WriteToText 似乎与 PubSub 的流媒体源不兼容。可能有解决方法,或者 Java 版本可能兼容,但我选择完全使用不同的解决方案。

Pab*_*blo 9

WriteToTextPython SDK 中的转换不支持流式传输。

相反,您可以考虑apache_beam.io.fileio. 在这种情况下,您可以编写如下内容(假设为 10 分钟的窗口):

my_pcollection = (p | ReadFromPubSub(....)
                    |  WindowInto(FixedWindows(10*60))
                    |  fileio.WriteToFiles(path=known_args.output))
Run Code Online (Sandbox Code Playgroud)

这足以为每个窗口写出单独的文件,并随着流的推进继续这样做。

你会看到这样的文件(假设输出是gs://mybucket/)。当窗口被触发时,文件将被打印:

gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...
Run Code Online (Sandbox Code Playgroud)

默认情况下,文件具有$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix名称 -默认情况下前缀为前缀output,但您可以传递更复杂的文件命名函数。


如果您想自定义文件的写入方式(例如文件的命名、数据的格式或类似的东西),您可以查看WriteToFiles.

您可以在此处看到Beam 测试中使用的转换示例,其中包含更复杂的参数 - 但听起来默认行为对您来说应该足够了。