小编Chr*_*der的帖子

GCP Dataflow 2.0 PubSub到GCS

我很难理解TextIO.write()的.withFileNamePolicy的概念.提供FileNamePolicy的要求似乎非常复杂,因为它可以像指定GCS存储桶来编写流式字段一样简单.

在高层次上,我将JSON消息流式传输到PubSub主题,并且我想将这些原始消息写入GCS中的文件以进行永久存储(我还将对消息进行其他处理).我最初开始使用这个Pipeline,认为这很简单:

public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options); 

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply("Write to GCS", TextIO.write().to(gcs_bucket);

        p.run();

    }
Run Code Online (Sandbox Code Playgroud)

我收到了需要WindowedWrites的错误,我申请了,然后需要FileNamePolicy.这是事情变得多毛的地方.

我去了梁文档并检查了FilenamePolicy.看起来我需要扩展这个类,然后还需要扩展其他抽象类来使其工作.不幸的是,关于Apache的文档有点不足,我找不到Dataflow 2.0这样做的任何示例,除了Wordcount示例,它甚至用于在帮助器类中实现这些细节.

所以我可以通过复制WordCount的大部分示例来完成这项工作,但我正在努力更好地理解这个细节.我有几个问题:

1)是否有任何路线图项目可以抽象出很多这种复杂性?看起来我应该像在非WindowsWrite中一样提供GCS存储桶,然后只提供一些基本选项,如时序和文件命名规则.我知道将流窗口数据写入文件比打开文件指针(或对象存储等效)更复杂.

2)看起来要做到这一点,我需要创建一个WindowedContext对象,它需要提供一个BoundedWindow抽象类,PaneInfo对象类,然后是一些分片信息.可用于这些的信息非常简单,我很难知道所有这些实际需要什么,特别是考虑到我的简单用例.有没有很好的例子可以实现这些?另外,看起来我还需要将#scilt作为TextIO.write的一部分设置,但是还要将#shads作为fileNamePolicy的一部分提供?

感谢您帮助我理解这背后的细节,希望学到一些东西!

编辑7/20/17 所以我终于通过扩展FilenamePolicy来运行此管道.我的挑战是需要从Pu​​bSub定义流数据的窗口.这是代码的非常接近的表示:

public class ReadData {
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply("Write to GCS", TextIO.write().to("gcs_bucket")
                .withWindowedWrites()
                .withFilenamePolicy(new TestPolicy())
                .withNumShards(10));

        p.run();

    }
}

class TestPolicy extends FileBasedSink.FilenamePolicy {
    @Override
    public …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-storage google-cloud-platform google-cloud-dataflow

5
推荐指数
2
解决办法
2355
查看次数