ris*_*097 1 google-cloud-storage google-cloud-dataflow apache-beam
如何使用 Apache Beam 中的 TextIO 将从 PubSub 收到的消息写入 GCS 中的文本文件?看到了一些方法,如 withWindowedWrites() 和 withFilenamePolicy() 但在文档中找不到任何示例。
这是一个示例,前提是您使用的是 Java SDK (BEAM 2.1.0)。
PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.begin()
.apply("PubsubIO",PubsubIO.readStrings()
.withTimestampAttribute("timestamp")
.fromSubscription("projects/YOUR-PROJECT/subscriptions/YOUR-SUBSCRIPTION"))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30L))))
.apply(TextIO.write().to("gs://YOUR-BUCKET").withWindowedWrites());
Run Code Online (Sandbox Code Playgroud)
您可以通过探索 TextIO.Write.expand(PCollection input) 中的“expand”方法来查看 SDK 用于文件命名的默认值。具体我会看看 DefaultFilenamePolicy.java
| 归档时间: |
|
| 查看次数: |
4187 次 |
| 最近记录: |