标签: google-cloud-dataflow

当我尝试在同一个管道执行时创建不同的BigQuery表时出错

我使用以下代码执行管道:

PCollection<TableRow> test1 = ...
test1
    .apply(BigQueryIO.Write
        .named("test1 write")
        .to("project_name:dataset_name.test1")
        .withSchema(tableSchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

PCollection<TableRow> test2 = ...
test2
    .apply(BigQueryIO.Write
        .named("test2 write")
        .to("project_name:dataset_name.test2")
        .withSchema(tableSchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Run Code Online (Sandbox Code Playgroud)

如果我执行管道并且表"test1"和"test2"都不存在,我将获得以下信息:

jun 09, 2015 12:29:24 PM com.google.cloud.dataflow.sdk.util.BigQueryTableInserter tryCreateTable
INFORMACIÓN: Trying to create BigQuery table: project_name:dataset_name.test1
jun 09, 2015 12:29:27 PM com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer$LoggingHttpBackoffUnsuccessfulResponseHandler handleResponse
ADVERTENCIA: Request failed with code 404, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/pragmatic-armor-455/datasets/audit/tables/project_name:dataset_name.test2/insertAll
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : …
Run Code Online (Sandbox Code Playgroud)

google-bigquery google-cloud-dataflow

1
推荐指数
1
解决办法
325
查看次数

Complex Cloud Dataflow管道不会在开发人员控制台中显示执行图

我们正在实现一个相当复杂的管道,该管道由链接在一起的多个GroupBy和Combine组成.除此之外,管道还应用了KeyedPCollectionTuple.

此管道已成功执行,但图表未显示在Google开发人员的控制台中.仅显示日志.这些步骤也缺失了.

有没有办法让他们展示?

google-cloud-dataflow

1
推荐指数
1
解决办法
109
查看次数

使用CombineFn从所有节点累积数据后,合并每个键的所有值

我想在perKey的基础上迭代KV pCollection的值.我使用下面的代码来组合使用自定义类,

PCollection<KV<String, String>> combinesAttributes =
              valExtract.get(extAttUsers).apply(Combine.<String, String>perKey(
                      new CombineAttributes()));
Run Code Online (Sandbox Code Playgroud)

以下是我的自定义组合课程,

public static class CombineAttributes implements SerializableFunction<Iterable<String>, String> {
   @Override
   public String apply(Iterable<String> input) {...}..}
Run Code Online (Sandbox Code Playgroud)

这适用于小输入,但对于大输入,联合收割机不如预期.输出只合并了一些键的值,其他的则丢失了.我假设输出只包含来自一个节点的数据.

https://cloud.google.com/dataflow/model/combine中的文档提及使用CombineFn以便在所有节点中组合每个键的完整值集合.

但是,当我更改下面的自定义组合功能时,我收到以下错误,

incompatible types: CombineAttributes cannot be converted to com.google.cloud.dataflow.sdk.transforms.SerializableFunction<java.lang.Iterable<java.lang.String>,java.lang.String>
Run Code Online (Sandbox Code Playgroud)

结合功能

public static class CombineAttributes extends CombineFn<Iterable<String>, CombineAttributes.Accum, String> {

public static class Accum {
  List<String> inputList = new ArrayList<String>();
}
public Accum createAccumulator() { return new Accum(); }
public Accum addInput(Accum accum, Iterable<String> input) {
  for (String item : input) {
    accum.inputList.add(item); …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-dataflow

1
推荐指数
1
解决办法
831
查看次数

如何在tf.Transform中使用Google DataFlow Runner和Templates?

我们正在Google Cloud上建立机器学习管道,利用GC ML-Engine进行分布式TensorFlow培训和模型服务,并利用DataFlow进行分布式预处理作业.

我们希望在Google Cloud上运行我们的Apache Beam应用程序作为DataFlow作业.看看ML-Engine样本 ,似乎可以得到tensorflow_transform.beam.impl AnalyzeAndTransformDataset来指定使用哪个PipelineRunner,如下所示:

from tensorflow_transform.beam import impl as tft
pipeline_name = "DirectRunner"
p = beam.Pipeline(pipeline_name) 
p | "xxx" >> xxx | "yyy" >> yyy | tft.AnalyzeAndTransformDataset(...)
Run Code Online (Sandbox Code Playgroud)

TemplatingDataflowPipelineRunner提供了将预处理开发与参数化操作分开的功能 - 请参阅此处:https://cloud.google.com/dataflow/docs/templates/overview-基本上:

  • A)在PipelineOptions派生类型中,将选项类型更改为ValueProvider(python方式:类型推断或类型提示???)
  • B)将跑步者改为TemplatingDataflowPipelineRunner
  • C) mvn原型:生成以GCS存储模板(python方式:像TF Hypertune一样的yaml文件???)
  • D)运行gcloud beta数据流作业--gcs-location -parameters

现在的问题是:你能告诉我,我们怎能用tf.Transform利用TemplatingDataflowPipelineRunner

google-cloud-dataflow tensorflow apache-beam google-cloud-ml google-cloud-ml-engine

1
推荐指数
1
解决办法
656
查看次数

谷歌的数据流和谷歌的数据流有什么区别?

DataFlow本身有ETL,计算和流媒体处理为什么我们需要去谷歌的Dataproc?

google-cloud-dataflow google-cloud-dataproc

1
推荐指数
1
解决办法
2004
查看次数

使用 Apache Beam 将流数据写入 GCS

如何使用 Apache Beam 中的 TextIO 将从 PubSub 收到的消息写入 GCS 中的文本文件?看到了一些方法,如 withWindowedWrites() 和 withFilenamePolicy() 但在文档中找不到任何示例。

google-cloud-storage google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
4187
查看次数

谷歌云数据流工作线程

假设我们有一个有 4 个 CPU 内核的工人。Dataflow 工作机器中的并行性如何配置?我们是否并行超过内核数量?

在哪里可以获得此类信息?

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
2458
查看次数

当你在数据流中使用`fromTable` vs`fromQuery("SELECT*...")`时,`BigQueryIO`有区别吗?

当您需要从数据流作业中的bigquery中的一个或多个表中读取所有数据时,我会说有两种方法.第一种方法是使用BigQueryIOwith from,它读取有问题的表,第二种方法是使用fromQuery指定读取同一个表中所有数据的查询的位置.所以我的问题是:

  • 使用其中一个是否有任何成本或性能优势?

我没有在文档中找到任何关于此的内容,但我真的很想知道.我想可能read更快,因为您不需要运行扫描数据的查询,这意味着它更类似于您在BigQueryUI中的预览功能.如果这是真的,它也可能便宜得多,但如果它们的成本相同则有意义.

简而言之,有什么区别:

BigQueryIO.read(...).from(tableName)
Run Code Online (Sandbox Code Playgroud)

BigQueryIO.read(...).fromQuery("SELECT * FROM " + tableName)
Run Code Online (Sandbox Code Playgroud)

dataflow google-bigquery google-cloud-dataflow

1
推荐指数
1
解决办法
356
查看次数

如何在将 JSON 文件加载到 BigQuery 表时管理/处理架构更改

这是我的输入文件的样子:

{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}
{"Id": 2, "Address": {"City":"Mumbai"}}
{"Id": 3, "Address": {"Street":"XYZ Road"}}
{"Id": 4}
{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}
Run Code Online (Sandbox Code Playgroud)

在我的数据流管道中,我如何动态确定每行中存在哪些字段以符合 BigQuery 表架构。例如,在第 2 行中,Street丢失了。我希望Address.StreetBigQuery 中的列条目为"N/A"ornull并且不希望管道因架构更改或丢失数据而失败。

在使用 Python 写入 BigQuery 之前,如何在数据流作业中处理此逻辑?

python google-bigquery google-cloud-platform google-cloud-dataflow google-cloud-functions

1
推荐指数
1
解决办法
1152
查看次数

ParDo中的侧输出 Apache Beam Python SDK

由于文档仅适用于JAVA,我无法理解其含义.

它声明 - "虽然ParDo总是产生一个主输出PCollection(作为应用的返回值),你也可以让你的ParDo产生任意数量的额外输出PCollections.如果你选择有多个输出,你的ParDo将返回所有的输出PCollections(包括主输出)捆绑在一起.例如,在Java中,输出PCollections捆绑在一个类型安全的PCollectionTuple中."

我理解捆绑在一起意味着什么,但如果我在我的DoFn中产生一个标签,它是否会产生一个包含所有其他输出的空包,并在代码中遇到它们时产生其他输出?或者它等待所有产量准备好输入并将它们全部输出到一起?

文档中没有太多清晰度.虽然我认为它不会等待,只是遇到收益,但我仍然需要了解发生了什么.

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1796
查看次数