我正在尝试使用Google Cloud Dataflow将Google PubSub消息写入Google云端存储.我知道TextIO/AvroIO不支持流媒体管道.但是,我在[1]中读到,可以ParDo/DoFn通过作者的评论在流式传输管道中写入GCS .我尽可能地按照他们的文章构建了一条管道.
我的目标是这种行为:
dataflow-requests/[isodate-time]/[paneIndex].我得到了不同的结果:
我如何解决这些问题并获得我期待的行为?
示例日志输出:
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.773 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.846 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.847 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
Run Code Online (Sandbox Code Playgroud)
这是我的代码:
package com.example.dataflow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import …Run Code Online (Sandbox Code Playgroud) google-cloud-storage google-cloud-pubsub google-cloud-dataflow
我很难理解TextIO.write()的.withFileNamePolicy的概念.提供FileNamePolicy的要求似乎非常复杂,因为它可以像指定GCS存储桶来编写流式字段一样简单.
在高层次上,我将JSON消息流式传输到PubSub主题,并且我想将这些原始消息写入GCS中的文件以进行永久存储(我还将对消息进行其他处理).我最初开始使用这个Pipeline,认为这很简单:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply("Write to GCS", TextIO.write().to(gcs_bucket);
p.run();
}
Run Code Online (Sandbox Code Playgroud)
我收到了需要WindowedWrites的错误,我申请了,然后需要FileNamePolicy.这是事情变得多毛的地方.
我去了梁文档并检查了FilenamePolicy.看起来我需要扩展这个类,然后还需要扩展其他抽象类来使其工作.不幸的是,关于Apache的文档有点不足,我找不到Dataflow 2.0这样做的任何示例,除了Wordcount示例,它甚至用于在帮助器类中实现这些细节.
所以我可以通过复制WordCount的大部分示例来完成这项工作,但我正在努力更好地理解这个细节.我有几个问题:
1)是否有任何路线图项目可以抽象出很多这种复杂性?看起来我应该像在非WindowsWrite中一样提供GCS存储桶,然后只提供一些基本选项,如时序和文件命名规则.我知道将流窗口数据写入文件比打开文件指针(或对象存储等效)更复杂.
2)看起来要做到这一点,我需要创建一个WindowedContext对象,它需要提供一个BoundedWindow抽象类,PaneInfo对象类,然后是一些分片信息.可用于这些的信息非常简单,我很难知道所有这些实际需要什么,特别是考虑到我的简单用例.有没有很好的例子可以实现这些?另外,看起来我还需要将#scilt作为TextIO.write的一部分设置,但是还要将#shads作为fileNamePolicy的一部分提供?
感谢您帮助我理解这背后的细节,希望学到一些东西!
编辑7/20/17 所以我终于通过扩展FilenamePolicy来运行此管道.我的挑战是需要从PubSub定义流数据的窗口.这是代码的非常接近的表示:
public class ReadData {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Write to GCS", TextIO.write().to("gcs_bucket")
.withWindowedWrites()
.withFilenamePolicy(new TestPolicy())
.withNumShards(10));
p.run();
}
}
class TestPolicy extends FileBasedSink.FilenamePolicy {
@Override
public …Run Code Online (Sandbox Code Playgroud) java google-cloud-storage google-cloud-platform google-cloud-dataflow
java ×1