相关疑难解决方法(0)

使用DoFn使用Cloud Dataflow从PubSub写入Google云端存储

我正在尝试使用Google Cloud Dataflow将Google PubSub消息写入Google云端存储.我知道TextIO/AvroIO不支持流媒体管道.但是,我在[1]中读到,可以ParDo/DoFn通过作者的评论在流式传输管道中写入GCS .我尽可能地按照他们的文章构建了一条管道.

我的目标是这种行为:

  • 消息以最多100个批次写入GCS中的对象(每个窗口窗格一个),该路径对应于发布消息的时间dataflow-requests/[isodate-time]/[paneIndex].

我得到了不同的结果:

  • 每小时窗口中只有一个窗格.因此,我只在每小时"桶"中获得一个文件(它实际上是GCS中的对象路径).将MAX_EVENTS_IN_FILE减少到10没有区别,仍然只有一个窗格/文件.
  • 每个GCS对象中只有一条消息被写出
  • 写入GCS时,管道偶尔会引发CRC错误.

我如何解决这些问题并获得我期待的行为?

示例日志输出:

21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.773 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.846 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.847 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
Run Code Online (Sandbox Code Playgroud)

这是我的代码:

package com.example.dataflow;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import …
Run Code Online (Sandbox Code Playgroud)

google-cloud-storage google-cloud-pubsub google-cloud-dataflow

16
推荐指数
1
解决办法
4420
查看次数

GCP Dataflow 2.0 PubSub到GCS

我很难理解TextIO.write()的.withFileNamePolicy的概念.提供FileNamePolicy的要求似乎非常复杂,因为它可以像指定GCS存储桶来编写流式字段一样简单.

在高层次上,我将JSON消息流式传输到PubSub主题,并且我想将这些原始消息写入GCS中的文件以进行永久存储(我还将对消息进行其他处理).我最初开始使用这个Pipeline,认为这很简单:

public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options); 

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply("Write to GCS", TextIO.write().to(gcs_bucket);

        p.run();

    }
Run Code Online (Sandbox Code Playgroud)

我收到了需要WindowedWrites的错误,我申请了,然后需要FileNamePolicy.这是事情变得多毛的地方.

我去了梁文档并检查了FilenamePolicy.看起来我需要扩展这个类,然后还需要扩展其他抽象类来使其工作.不幸的是,关于Apache的文档有点不足,我找不到Dataflow 2.0这样做的任何示例,除了Wordcount示例,它甚至用于在帮助器类中实现这些细节.

所以我可以通过复制WordCount的大部分示例来完成这项工作,但我正在努力更好地理解这个细节.我有几个问题:

1)是否有任何路线图项目可以抽象出很多这种复杂性?看起来我应该像在非WindowsWrite中一样提供GCS存储桶,然后只提供一些基本选项,如时序和文件命名规则.我知道将流窗口数据写入文件比打开文件指针(或对象存储等效)更复杂.

2)看起来要做到这一点,我需要创建一个WindowedContext对象,它需要提供一个BoundedWindow抽象类,PaneInfo对象类,然后是一些分片信息.可用于这些的信息非常简单,我很难知道所有这些实际需要什么,特别是考虑到我的简单用例.有没有很好的例子可以实现这些?另外,看起来我还需要将#scilt作为TextIO.write的一部分设置,但是还要将#shads作为fileNamePolicy的一部分提供?

感谢您帮助我理解这背后的细节,希望学到一些东西!

编辑7/20/17 所以我终于通过扩展FilenamePolicy来运行此管道.我的挑战是需要从Pu​​bSub定义流数据的窗口.这是代码的非常接近的表示:

public class ReadData {
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply("Write to GCS", TextIO.write().to("gcs_bucket")
                .withWindowedWrites()
                .withFilenamePolicy(new TestPolicy())
                .withNumShards(10));

        p.run();

    }
}

class TestPolicy extends FileBasedSink.FilenamePolicy {
    @Override
    public …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-storage google-cloud-platform google-cloud-dataflow

5
推荐指数
2
解决办法
2355
查看次数