标签: google-cloud-dataflow

从 Google Cloud BigQuery 读取数据

我是 Pipeline 世界和 Google API DataFlow 的新手。

我想使用 sqlQuery 从 BigQuery 读取数据。当我读取所有数据库时,它工作正常。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
     BigQueryIO.Read
         .named("Read")
         .from("test:DataSetTest.data"));
Run Code Online (Sandbox Code Playgroud)

但是当我使用 fromQuery 时出现错误。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
     BigQueryIO.Read
         .named("Read")
         .fromQuery("SELECT * FROM DataSetTest.data"));
Run Code Online (Sandbox Code Playgroud)

错误:

线程“main”中出现异常 java.lang.IllegalArgumentException:查询“SELECT * FROM DataSetTest.data”的验证失败。如果查询依赖于管道的早期阶段,则可以使用#withoutValidation 禁用此验证。

在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:449)

在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.validate(BigQueryIO.java:432)

在com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:357)

在com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)

在 com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:47)

在com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:151)

在 Test.java.packageid.StarterPipeline.main(StarterPipeline.java:72)

引起原因:java.lang.NullPointerException:必须指定必需参数projectId。

在 com.google.api.client.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)

在 com.google.api.client.util.Preconditions.checkNotNull(Preconditions.java:140)

在 com.google.api.services.bigquery.Bigquery$Jobs$Query。(Bigquery.java:1751)

在 com.google.api.services.bigquery.Bigquery$Jobs.query(Bigquery.java:1724)

在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:445)

... 6 更多 …

java google-app-engine dataflow google-bigquery google-cloud-dataflow

1
推荐指数
1
解决办法
3842
查看次数

是否有 Google Dataflow MongoDB 源/接收器?

我知道 Google Dataflow 仅正式支持 Google Cloud Storage、BigQuery、Avro 文件或开箱即用的 Pub/Sub 中的文件作为数据流的 I/O。

但由于它有一个用于自定义源和接收器的 API,我想知道 MongoDB 是否有一些管道 I/O 实现?

现在,我必须将数据迁移到 BigQuery 或编写整个 Pipeline I/O 实现,然后才能知道 Google Dataflow 是否是解决我当前问题的可行解决方案。

我尝试用谷歌搜索并查看当前的 SDK 问题,但没有看到任何相关内容。我什至开始怀疑我是否错过了 Google Dataflow 概念和文档中的一些非常基本的内容,这些内容完全使使用 MongoDB 作为数据源的最初想法无效。

mongodb google-cloud-platform google-cloud-dataflow

1
推荐指数
1
解决办法
3068
查看次数

数据流 Python SDK Avro 源/同步

我希望使用 Python SDK 在 GCS 中摄取和写入 Avro 文件。目前 Avro 利用 Python SDK 可以实现这一点吗?如果是这样我该怎么做?我在源代码中看到了关于此的 TODO 评论,所以我不太乐观。

python avro google-cloud-dataflow

1
推荐指数
1
解决办法
988
查看次数

在 Python Apache Beam 中使用 value provider 参数的方法

现在我只能使用 ParDo 获取类中的 RunTime 值,还有其他方法可以像在我的函数中一样使用运行时参数吗?

这是我现在得到的代码:

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--firestore_document',default='')

def run(argv=None):

    parser = argparse.ArgumentParser()

    pipeline_options = PipelineOptions()

    user_options = pipeline_options.view_as(UserOptions)

    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:

        rows = (p 
        | 'Create inputs' >> beam.Create(['']) 
        | 'Call Firestore' >> beam.ParDo(
                CallFirestore(user_options.firestore_document)) 
        | 'Read DB2' >> beam.Map(ReadDB2))
Run Code Online (Sandbox Code Playgroud)

我希望 user_options.firestore_document 无需执行 ParDo 即可在其他功能中使用

python parameters google-cloud-platform google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1769
查看次数

apache 光束数据流中的外部 api 调用

我有一个用例,我读入存储在谷歌云存储中的换行 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用外部 API 来执行重复数据删除,无论该 json 元素之前是否被发现。我在做一个ParDoDoFn每个JSON。

我还没有看到任何在线教程说明如何从 apache beam DoFnDataflow调用外部 API 端点。

我正在使用JAVABeam 的 SDK。我学习的一些教程解释了使用startBundleFinishBundle但我不清楚如何使用它

java google-cloud-dataflow apache-beam apache-beam-io

1
推荐指数
1
解决办法
3908
查看次数

Python/Apache-Beam:如何将文本文件解析为 CSV?

我还是 Beam 的新手,但是您究竟如何从 GCS 存储桶中的 CSV 文件中读取数据?我基本上使用 Beam 将这些文件转换为 Pandas 数据帧,然后应用 sklearn 模型来“训练”这些数据。我见过的大多数示例都预先定义了标题,我希望这个 Beam 管道可以推广到标题肯定不同的任何文件。有一个名为beam_utils的库可以完成我想做的事情,但后来我遇到了这个错误:AttributeError: module 'apache_beam.io.fileio' has no attribute 'CompressionTypes'

代码示例:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# The error occurs in this import
from beam_utils.sources import CsvFileSource

options = {
    'project': 'my-project',
    'runner:': 'DirectRunner',
    'streaming': False
}

pipeline_options = PipelineOptions(flags=[], **options)

class Printer(beam.DoFn):
    def process(self, element):
        print(element)

with beam.Pipeline(options=pipeline_options) as p:  # Create the Pipeline with the specified options.

    data = (p
            | …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
2687
查看次数

Apache Beam 中的顺序执行 - Java SDK 2.18.0

嗨,我有几个查询,我想使用 Apache Beam 依次运行和保存结果,我见过一些类似的问题,但找不到答案。我习惯于使用 Airflow 设计管道,而我对 Apache Beam 还是比较陌生。我正在使用 Dataflow 运行程序。这是我现在的代码:我希望 query2 仅在 query1 结果保存到相应表后运行。我如何链接它们?

    PCollection<TableRow> resultsStep1 = getData("Run Query 1",
            "Select * FROM basetable");

    resultsStep1.apply("Save Query1 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
                    .to("resultsStep1")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    PCollection<TableRow> resultsStep2 = getData("Run Query 2",
            "Select * FROM resultsStep1");

    resultsStep2.apply("Save Query2 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
                    .to("resultsStep2")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );
Run Code Online (Sandbox Code Playgroud)

这是我的 getData 函数定义:

private PCollection<TableRow> getData(final String taskName, final String query) {
    return pipeline.apply(taskName,
            BigQueryIO.readTableRowsWithSchema()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withCoder(TableRowJsonCoder.of()));
}
Run Code Online (Sandbox Code Playgroud)

编辑(更新):结果: You can’t sequence the completion of a …

google-cloud-dataflow apache-beam apache-beam-io

1
推荐指数
1
解决办法
753
查看次数

如何自定义 GCP Dataflow 模板?

我打算在云存储数据流模板使用Pub/Sub 到文本文件,并在写入云存储之前进行少量自定义,例如处理(按摩)PubSub 消息。

我编写了 apache-beam 管道代码,但对如何部署它感到困惑。它使用的参数将与Cloud Storage 上的 Pub/Sub 到文本文件完全相同

文档中我了解到我可以使用 Google 提供的模板之一或创建自己的模板。但是,不是创建我自己的模板,而是有更好的方法来自定义 Google 提供的模板,因为它足以满足我的大部分要求

google-cloud-platform google-cloud-dataflow

1
推荐指数
1
解决办法
378
查看次数

使用 Apache Beam Python `WriteToFiles` 转换每个窗口只写一个文件

需要一些帮助。我有一些从 Pub/Sub 读取并写入 GCS 中的批处理文件的琐碎任务,但是在使用 fileio.WriteToFiles 时遇到了一些困难

with beam.Pipeline(options=pipeline_options) as p:
  input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
             | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
             | 'Parse' >> beam.Map(parse_json)
             | ' data w' >> beam.WindowInto(
                 FixedWindows(60),
                 accumulation_mode=AccumulationMode.DISCARDING
             ))

  event_data = (input
             | 'filter events' >> beam.Filter(lambda x: x['t'] == 'event')
             | 'encode et' >> beam.Map(lambda x: json.dumps(x))
             | 'write events to file' >> fileio.WriteToFiles(
                    path='gs://extention/ga_analytics/events/', shards=0))
Run Code Online (Sandbox Code Playgroud)

窗口触发后我需要一个文件,但文件数等于来自 Pubsub 的消息数,有人可以帮助我吗? 当前输出文件, 但我只需要一个文件。

python google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
613
查看次数

使用数据流 Kafka 到 bigquery 模板时出错

我正在使用数据流 kafka 到 bigquery 模板。启动数据流作业后,它会在队列中停留一段时间,然后失败并显示以下错误:

Error occurred in the launcher container: Template launch failed. See console logs.
Run Code Online (Sandbox Code Playgroud)

查看日志时,我看到以下堆栈跟踪:

at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:192) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) 
at com.google.cloud.teleport.v2.templates.KafkaToBigQuery.run(KafkaToBigQuery.java:343) 
at com.google.cloud.teleport.v2.templates.KafkaToBigQuery.main(KafkaToBigQuery.java:222) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata –
Run Code Online (Sandbox Code Playgroud)

在启动工作时,我提供了以下参数:

  1. 卡夫卡主题名称
  2. 引导服务器名称
  3. bigquery 主题名称
  4. SA电子邮件
  5. 区。

我的 kafka 主题只包含消息:你好

kafka 安装在 gcp 实例中,该实例与数据流工作者位于同一区域和子网中。

dataflow apache-kafka google-cloud-platform google-cloud-dataflow

1
推荐指数
1
解决办法
382
查看次数