标签: google-cloud-dataflow

如何从App Engine运行Google Cloud Dataflow作业?

阅读Cloud Dataflow文档后,我仍然不确定如何从App Engine运行数据流作业.可能吗?我的后端用Python或Java编写是否相关?谢谢!

google-app-engine google-cloud-dataflow

8
推荐指数
1
解决办法
2245
查看次数

如何获得两个PCollections的笛卡尔积

我对使用Google Cloud Dataflow非常陌生.我想获得两个PCollections的笛卡尔积.举例来说,如果我有两个PCollections (1, 2)("hello", "world"),它们的笛卡尔乘积是((1, "hello"), (1, "world"), (2, "hello"), (2, "world")).

任何想法我怎么能这样做?此外,由于笛卡尔积可能很大,我希望解决方案可以懒得创建产品,从而避免大量内存消耗.

谢谢!

google-cloud-dataflow

8
推荐指数
1
解决办法
546
查看次数

通过Google Cloud Dataflow创建/写入Parititoned BigQuery表

我想利用时间分区表的新BigQuery功能,但不确定这在1.6版本的Dataflow SDK中是否可行.

查看BigQuery JSON API,要创建一个分区表,需要传入一个

"timePartitioning": { "type": "DAY" }
Run Code Online (Sandbox Code Playgroud)

选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference.

我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?

这似乎与设置当前不可用的表到期时间类似.

google-bigquery google-cloud-dataflow apache-beam-io

8
推荐指数
2
解决办法
4379
查看次数

如何使用事务性DatastoreIO

我正在使用来自流式数据流管道的DatastoreIO,并在使用相同的密钥编写实体时收到错误.

2016-12-10T22:51:04.385Z: Error:   (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT
Run Code Online (Sandbox Code Playgroud)

如果我在密钥中使用随机数,那么事情可行,但我需要更新相同的密钥,那么有没有使用DataStoreIO执行此操作的事务方法?

static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
  private static final long serialVersionUID = 0;

  private final String namespace;
  private final String kind;

  CreateEntityFn(String namespace, String kind) {
    this.namespace = namespace;
    this.kind = kind;
  }

  public Entity makeEntity(String key, Tile tile) {
    Entity.Builder entityBuilder = Entity.newBuilder();
    Key.Builder keyBuilder = makeKey(kind, key );
    if (namespace != null) {
      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
    }
    entityBuilder.setKey(keyBuilder.build()); …
Run Code Online (Sandbox Code Playgroud)

google-cloud-datastore google-cloud-dataflow

8
推荐指数
1
解决办法
1521
查看次数

跳过标题行 - 是否可以使用Cloud DataFlow?

我创建了一个Pipeline,它从GCS中的文件读取,转换它,最后写入BQ表.该文件包含标题行(字段).

有没有办法以编程方式设置"跳过的标题行数",就像加载时在BQ中可以做的那样?

要跳过的标题行数

google-cloud-dataflow

7
推荐指数
1
解决办法
1351
查看次数

从Pipeline中的PCollection GCS文件名中读取文件?

我有一个连接到pub/sub的流管道,它发布了GCS文件的文件名.从那里我想读取每个文件并解析每一行上的事件(事件是我最终想要处理的事件).

我可以使用TextIO吗?当在执行期间定义文件名时,您是否可以在流管道中使用它(而不是使用TextIO作为源,并且fileName(s)在构造时已知).如果不是,我正在考虑做以下事情:

从pub/sub ParDo获取主题以读取每个文件并获取行处理文件的行...

我可以使用FileBasedReader或类似的东西来读取文件吗?文件不是太大,所以我不需要并行读取单个文件,但我需要读取大量文件.

google-cloud-dataflow

7
推荐指数
1
解决办法
891
查看次数

在DataFlow上取消没有dataloss的作业

我正试图找到一种优雅地结束我的工作的方式,以免丢失任何数据,从PubSub流式传输并写入BigQuery.

我可以设想的一种可能的方法是让作业停止提取新数据,然后运行直到它处理完所有内容,但我不知道是否/如何实现这一点.

google-cloud-pubsub google-cloud-dataflow

7
推荐指数
1
解决办法
242
查看次数

允许数据流读取指向Drive的BigQuery表?

BigQuery可以从Google云端硬盘中读取联合来源.看到这里.我希望能够将BigQuery中的表读入指向Drive文档的Dataflow管道中.

将BigQuery挂接到Drive中的文件非常正常:

在此输入图像描述

但是,当我尝试将该表读入我的Dataflow管道时,我(可以理解)得到以下错误:

找不到合适的凭据来访问Google云端硬盘.联系表所有者以获取帮助.

[..]
PCollection<TableRow> results = pipeline.apply("whatever",
BigQueryIO.Read.fromQuery("SELECT * from [CPT_7414_PLAYGROUND.google_drive_test]"))
.apply(ParDo.of(new DoFn<TableRow, TableRow>() {
[..]
Run Code Online (Sandbox Code Playgroud)

我如何使Dataflow能够从BigQuery中指向Drive的表中读取权限?

google-drive-api google-bigquery google-cloud-dataflow

7
推荐指数
1
解决办法
194
查看次数

在Google Cloud Dataflow中自动缩放

我们有一个流管道,我们已启用自动缩放功能.通常,一个工作人员足以处理传入的数据,但是如果存在积压工作,我们希望自动增加工作人员的数量.

我们的管道从Pubsub读取,并且每3分钟使用加载作业将批次写入BigQuery.我们从一个工作者开始运行此管道,向pubsub发布两倍于一个工作者可以使用的数据.2小时后,自动缩放仍未启动,因此积压数据约为1小时.考虑到自动调节旨在将积压保持在10秒以下(根据此SO答案),这似乎相当差.

这里的文档说,流媒体作业的自动调节是测试版,并且如果接收器是高延迟的,那么已知它是粗粒度的.是的,我想每3分钟做一次BigQuery批次就算是高延迟!在改进此自动缩放算法方面是否有任何进展?

在此期间我们可以做任何解决方法,例如测量管道中不同点的吞吐量吗?我找不到有关如何将吞吐量报告给自动扩展系统的任何文档.

autoscaling google-cloud-dataflow

7
推荐指数
1
解决办法
754
查看次数

如何在Python中创建从Pub / Sub到GCS的数据流管道

我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)中累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。

我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。

这是到目前为止我所获得的(Apache Beam Python SDK 2.10.0):

import apache_beam as beam

TOPIC_PATH="projects/<my-project>/topics/<my-topic>"

def CombineFn(e):
    return "\n".join(e)

o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
       | "Window" >> beam.WindowInto(beam.window.FixedWindows(30))
       | "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()
       | "Output" >> beam.io.WriteToText("<GCS path or local path>"))

res = p.run()
res.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

我在本地环境中运行该程序没有问题。

python main.py
Run Code Online (Sandbox Code Playgroud)

它在本地运行,但可以从指定的Pub / Sub主题读取,并且每隔30秒就会按预期写入指定的GCS路径。

但是现在的问题是,当我在Google Cloud Platform(即Cloud Dataflow)上运行它时,它不断发出神秘的异常。

java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-pubsub google-cloud-dataflow apache-beam

7
推荐指数
1
解决办法
1157
查看次数