标签: google-cloud-dataflow

如何从App Engine运行Google Cloud Dataflow作业?

阅读Cloud Dataflow文档后,我仍然不确定如何从App Engine运行数据流作业.可能吗?我的后端用Python或Java编写是否相关?谢谢!

google-app-engine google-cloud-dataflow

8
推荐指数
1
解决办法
2245
查看次数

Google Cloud Dataflow ETL(数据存储 - >转换 - > BigQuery)

我们使用数据存储作为持久性后端在Google App Engine上运行应用程序.目前,应用程序主要具有"OLTP"功能和一些初步报告.在实施报告时,我们遇到使用数据存储区和GQL处理大量数据(数百万个对象)非常困难.为了使用适当的报告和商业智能功能增强我们的应用程序,我们认为最好设置ETL过程以将数据从Datastore移动到BigQuery.

最初我们考虑将ETL过程实现为App Engine cron作业,但看起来Dataflow也可用于此.我们有以下设置流程的要求

  • 能够通过使用BigQuery的非流API将所有现有数据推送到BigQuery.
  • 完成上述操作后,只要使用流API在Datastore到BigQuery中更新/创建任何新数据,就将其推送.

我的问题是

  1. Cloud Dataflow是否适合实施此管道?
  2. 我们能够推送现有数据吗?一些种类有数百万个对象.
  3. 实施它的正确方法应该是什么?我们正在考虑两种方法. 一种方法是通过pub/sub,即对于现有数据创建一个cron作业并将所有数据推送到pub/sub.对于任何新的更新,在DataStore中更新数据的同时将数据推送到pub/sub.Dataflow Pipeline将从pub/sub中选择它并将其推送到BigQuery. 第二种方法是在Dataflow中创建一个批处理管道,它将查询DataStore并将任何新数据推送到BigQuery.

问题是这两种方法可行吗?哪一个更好的成本?有没有比上面两个更好的其他方式?

谢谢,

rizTaak

google-app-engine google-bigquery google-cloud-datastore google-cloud-dataflow

8
推荐指数
1
解决办法
4540
查看次数

将数据流管道的输出写入分区目标

我们有一个单独的流媒体事件源,每秒有数千个事件,这些事件都标有一个id,用于标识事件所属的数万个客户中的哪一个.我们想使用这个事件源来填充数据仓库(在流模式下),但是,我们的事件源不是持久性的,所以我们还希望将原始数据存档在GCS中,以便我们可以通过我们的数据重放它仓库管道,如果我们进行需要它的更改.由于数据保留要求,我们持久存储的任何原始数据都需要由客户进行分区,以便我们可以轻松删除它.

在Dataflow中解决这个问题最简单的方法是什么?目前我们正在使用自定义接收器创建数据流作业,该接收器将数据写入GCS/BigQuery上的每个客户的文件,这是明智的吗?

google-cloud-storage google-cloud-dataflow

8
推荐指数
1
解决办法
1037
查看次数

如何获得两个PCollections的笛卡尔积

我对使用Google Cloud Dataflow非常陌生.我想获得两个PCollections的笛卡尔积.举例来说,如果我有两个PCollections (1, 2)("hello", "world"),它们的笛卡尔乘积是((1, "hello"), (1, "world"), (2, "hello"), (2, "world")).

任何想法我怎么能这样做?此外,由于笛卡尔积可能很大,我希望解决方案可以懒得创建产品,从而避免大量内存消耗.

谢谢!

google-cloud-dataflow

8
推荐指数
1
解决办法
546
查看次数

在从Dataflow插入BigQuery之前验证行

根据我们 如何从数据流加载Bigquery表时设置maximum_bad_records?maxBadRecords从Dataflow将数据加载到BigQuery时,目前无法设置配置.建议在将数据插入BigQuery之前验证Dataflow作业中的行.

如果我有TableSchema和a TableRow,我该如何确保可以安全地将行插入表中?

必须有一种更简单的方法来做到这一点,而不是迭代模式中的字段,查看它们的类型并查看行中值的类,对吧?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败.

更新:

我的用例是一个ETL作业,最初将在JSON上运行(每行一个对象)登录云存储并批量写入BigQuery,但稍后将从PubSub读取对象并连续写入BigQuery.这些对象包含很多BigQuery中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式的JSON有效负载).像时间戳这样的东西也需要格式化以与BigQuery一起使用.这个作业的一些变体会在不同的输入上运行并写入不同的表.

从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些并将对象输出到BigQuery.我或多或少只是循环遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式格式化,如果需要应用格式(这可能是下行,划分毫秒时间戳) 1000,从URL中提取主机名等),并将值写入TableRow对象.

我的问题是数据混乱.有几亿个物体有一些看起来并不像预期的那样,这种情况很少见,但是这些物品仍然很少见.有时,应包含字符串的属性包含整数,反之亦然.有时会有一个数组或一个应该有字符串的对象.

理想情况下,我想接受TableRow并通过TableSchema并询问"这有效吗?".

因为这是不可能的,所以我做的是查看TableSchema对象并尝试自己验证/转换值.如果TableSchema说属性是STRINGI 类型,则value.toString()在将其添加到之前TableRow.如果是,INTEGER我检查它是a Integer,Long或者BigInteger等等.这种方法的问题在于我只是猜测BigQuery会起什么作用.它接受哪些Java数据类型FLOAT?为了TIMESTAMP?我认为我的验证/演员表可以解决大多数问题,但总有例外和边缘情况.

根据我的经验,这是非常有限的,整个工作流程(工作?工作流程?不确定正确的术语)如果单行失败BigQuery的验证失败(就像常规加载一样,除非maxBadRecords设置为足够大的数字).它也失败了表面有用的消息,如'BigQuery导入作业"dataflow_job_xxx"失败.原因:(5db0b2cdab1557e0):项目"xxx"中的BigQuery作业"dataflow_job_xxx"已完成错误:errorResult:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:指定了JSON映射对于非记录字段,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射'.也许在哪里可以看到更详细的错误消息,可以告诉我它是哪个属性,价值是什么?没有这些信息,它也可以说"坏数据".

据我所知,至少在批处理模式下运行时,Dataflow会将TableRow对象写入云存储中的临时区域,然后在所有内容完成后启动加载.这意味着我无处可捕获任何错误,我的代码在加载BigQuery时不再运行.我还没有在流模式下运行任何工作,但是我不确定它会有什么不同,从我的(公认有限的)理解基本原理是相同的,它只是批量大小更小.

人们使用Dataflow和BigQuery,因此,如果不必担心由于单个错误输入而导致整个管道停止,就不可能完成这项工作.人们如何做到这一点?

google-bigquery google-cloud-dataflow

8
推荐指数
1
解决办法
1906
查看次数

通过Google Cloud Dataflow创建/写入Parititoned BigQuery表

我想利用时间分区表的新BigQuery功能,但不确定这在1.6版本的Dataflow SDK中是否可行.

查看BigQuery JSON API,要创建一个分区表,需要传入一个

"timePartitioning": { "type": "DAY" }
Run Code Online (Sandbox Code Playgroud)

选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference.

我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?

这似乎与设置当前不可用的表到期时间类似.

google-bigquery google-cloud-dataflow apache-beam-io

8
推荐指数
2
解决办法
4379
查看次数

如何使用事务性DatastoreIO

我正在使用来自流式数据流管道的DatastoreIO,并在使用相同的密钥编写实体时收到错误.

2016-12-10T22:51:04.385Z: Error:   (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT
Run Code Online (Sandbox Code Playgroud)

如果我在密钥中使用随机数,那么事情可行,但我需要更新相同的密钥,那么有没有使用DataStoreIO执行此操作的事务方法?

static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
  private static final long serialVersionUID = 0;

  private final String namespace;
  private final String kind;

  CreateEntityFn(String namespace, String kind) {
    this.namespace = namespace;
    this.kind = kind;
  }

  public Entity makeEntity(String key, Tile tile) {
    Entity.Builder entityBuilder = Entity.newBuilder();
    Key.Builder keyBuilder = makeKey(kind, key );
    if (namespace != null) {
      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
    }
    entityBuilder.setKey(keyBuilder.build()); …
Run Code Online (Sandbox Code Playgroud)

google-cloud-datastore google-cloud-dataflow

8
推荐指数
1
解决办法
1521
查看次数

'_UnwindowedValues'类型的对象没有len()意味着什么?

我正在使用Dataflow 0.5.5 Python.在非常简单的代码中遇到以下错误:

print(len(row_list))
Run Code Online (Sandbox Code Playgroud)

row_list是一个清单.完全相同的代码,相同的数据和相同的管道在DirectRunner上运行完全正常,但在DataflowRunner上抛出以下异常.它是什么意思以及我如何解决它?

job name: `beamapp-root-0216042234-124125`

    (f14756f20f567f62): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547)
    with op.scoped_metrics_container:
  File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495)
    op.start()
  File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149)
    def start(self):
  File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053)
    with self.scoped_start_state:
  File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968)
    with self.shuffle_source.reader() as reader:
  File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912)
    self.output(windowed_value)
  File "dataflow_worker/executor.py", line 152, …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
1657
查看次数

Apache Beam - 使用无界PCollection进行集成测试

我们正在为Apache Beam管道构建集成测试,并且遇到了一些问题.有关背景信息,请参见

有关我们管道的详情:

  • 我们使用PubsubIO我们的数据源(无界PCollection)
  • 中间变换包括自定义CombineFn和非常简单的窗口/触发策略
  • 我们最终的变换JdbcIO,用org.neo4j.jdbc.Driver写的Neo4j

目前的测试方法:

  • 在运行测试的计算机上运行Google Cloud的Pub/Sub模拟器
  • 构建内存中的Neo4j数据库并将其URI传递给我们的管道选项
  • 通过调用运行管道 OurPipeline.main(TestPipeline.convertToArgs(options)
  • 使用Google Cloud的Java Pub/Sub客户端库将消息发布到测试主题(使用Pub/Sub模拟器),该主题PubsubIO将从
  • 数据应该流经管道并最终命中我们的内存中的Neo4j实例
  • 在Neo4j中对这些数据的存在做出简单的断言

这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行.

我们目前面临的问题是,当我们运行我们的管道时,它会阻塞.我们正在使用DirectRunnerpipeline.run()( pipeline.run().waitUntilFinish()),但测试似乎在运行管道后挂起.因为这是一个无限制的PCollection(在流模式下运行),管道不会终止,因此不会到达它之后的任何代码.

所以,我有几个问题:

1)有没有办法运行管道然后稍后手动停止?

2)有没有办法异步运行管道?理想情况下,它会启动管道(然后将继续轮询Pub/Sub以获取数据),然后转到负责发布到Pub/Sub的代码.

3)这种集成测试方法是否合理,或者是否有更好的方法可能更直接?这里的任何信息/指导将不胜感激.

如果我能提供任何额外的代码/背景,请告诉我 - 谢谢!

java integration-testing google-cloud-pubsub google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
961
查看次数

从Apache Beam中的多个文件夹中读取文件,并将输出映射到文件名

继续从多个文件夹中读取文件,然后使用python sdk和dataflow runner将文件名如(filecontents,filename)输出到apache beam中的bigquery.

原本以为我可以为每个文件创建一个pcollection,然后使用文件名映射文件内容.

def read_documents(pipeline):
  """Read the documents at the provided uris and returns (uri, line) pairs."""
  pcolls = []
  count = 0
  with open(TESTIN) as uris:
       for uri in uris:
    #print str(uri).strip("[]/'")
         pcolls.append(
         pipeline
         | 'Read: uri' + str(uri)  >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
         | 'WithKey: uri'  + str(uri)   >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri) 
         )
       return pcolls | 'FlattenReadPColls' >> beam.Flatten()
Run Code Online (Sandbox Code Playgroud)

这工作正常,但速度很慢,大约10000个文件后无法在数据流云上工作.如果超过10000个文件,它将遭受破损的管道.

目前正试图从Text.io重载ReadAllFromText函数.Text.io旨在从文件名或模式的pcollection中快速读取大量文件.如果从Google云端存储中读取并且该文件具有内容编码,则此模块中存在错误.谷歌云存储自动枪杀文件并对其进行转码,但由于某些原因,ReadAllFromText无法使用它.您必须更改文件的元数据以删除内容编码,并将ReadAllFromText上的压缩类型设置为gzip.我将此问题包含在内,以防其他人遇到ReadAllFromText问题 https://issues.apache.org/jira/browse/BEAM-1874

我目前的代码看起来像这样

class ReadFromGs(ReadAllFromText):

    def __init__(self):
        super(ReadFromGs, self).__init__(compression_type="gzip")

    def expand(self, pvalue):
        files = …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-platform google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
1217
查看次数