小编ris*_*097的帖子

BigQueryIO.read().fromQuery性能缓慢

我注意到的一件事是,BigQueryIO.read().fromQuery()的性能比Apache Beam中的BigQueryIO.read().from()的性能要慢得多.为什么会这样?有没有办法改善它?

google-bigquery google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
1185
查看次数

谷歌云数据流与谷歌云数据融合

我最近看到 GCP 中有一个名为 Data Fusion 的新工具,在查看它时,与 Dataflow 相比,它似乎是创建 ETL 管道的一种更简单的方法。那么我们可以假设它是 Dataflow 的替代品吗?

google-cloud-platform google-cloud-dataflow

5
推荐指数
3
解决办法
6073
查看次数

将侧输入应用于 Apache Beam 中的 BigQueryIO.read 操作

有没有办法将侧输入应用于 Apache Beam 中的 BigQueryIO.read() 操作。

举例来说,我在 PCollection 中有一个值,我想在查询中使用它来从 BigQuery 表中获取数据。这可以使用侧面输入吗?或者在这种情况下应该使用其他东西吗?

我在类似的情况下使用了 NestedValueProvider,但我想我们只能在某个值取决于我的运行时值时使用它。或者我可以在这里使用相同的东西吗?如果我错了,请纠正我。

我试过的代码:

Bigquery bigQueryClient = start_pipeline.newBigQueryClient(options.as(BigQueryOptions.class)).build();
    Tabledata tableRequest = bigQueryClient.tabledata();

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new DoFn<String,TableRow>(){
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException
    {
        List<TableRow> list = c.sideInput(bqDataView);
        String tableName = list.get(0).get("table").toString();
        TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

        for(TableRow row:table.getRows())
        {
            c.output(row);
        }
    }
    }).withSideInputs(bqDataView));
Run Code Online (Sandbox Code Playgroud)

我得到的错误是:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize BeamTest.StarterPipeline$1@86b455
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
    at BeamTest.StarterPipeline.main(StarterPipeline.java:158)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.Bigquery$Tabledata …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

4
推荐指数
1
解决办法
2134
查看次数

在Apache Beam中从GCS读取文件

我需要从GCS存储桶中读取文件.我知道我将不得不使用GCS API /客户端库,但我找不到任何与之相关的示例.

我一直在参考GCS文档中的这个链接: GCS客户端库.但真的不能成功.如果有人能提供一个真正有用的例子.谢谢.

java google-cloud-dataflow apache-beam

4
推荐指数
1
解决办法
3257
查看次数

从DynamoDB迁移到Spanner / BigTable

我有一个用例,需要将70 TB的数据从DynamoDB迁移到BigTable和Spanner。具有单个索引的表将进入BigTable,否则将进入Spanner。

通过将数据导出到S3-> GCS-> Spanner / BigTable,可以轻松处理历史负载。但是具有挑战性的部分是要处理DynamoDB上同时发生的增量流负载。DynamoDB中有300个表。

如何以最好的方式处理这件事?有人做过吗?

amazon-dynamodb google-cloud-dataflow google-cloud-bigtable apache-beam google-cloud-spanner

4
推荐指数
1
解决办法
64
查看次数

在Dataproc中进行Hive授权

Dataproc尚未与Apache Ranger和Apache Sentry集成在一起。那么在Hive中推荐的用户授权方式是什么?

我是Dataproc的新手,您的回答确实会有所帮助。

google-cloud-dataproc

2
推荐指数
1
解决办法
438
查看次数

Gcloud 主题在 Apache Beam 中转义

我正在尝试通过 gcloud 命令运行数据流作业:

gcloud beta dataflow jobs run test --gcs-location gs://bucket/templates/templateName --parameters query="select a.name,b.salary,a.id from table1 a join table2 b on a.id = b.id"
Run Code Online (Sandbox Code Playgroud)

但我收到一条错误消息:

错误:(gcloud.beta.dataflow.jobs.run) 参数 --parameters:dict arg 的错误语法:[b.salary]。请查看gcloud topic escaping您是否需要有关转义列表或字典标志值的信息。

我看到了 gcloud 主题转义的文档,但无法弄清楚如何在此处应用它。有人可以帮我解决这个问题。

谢谢。

google-cloud-dataflow apache-beam

2
推荐指数
1
解决办法
1562
查看次数

在非轻量级DoFn中访问侧输入

如果我的类扩展了DoFn,如何访问侧输入的元素?

例如:

假设我有一个ParDo变换,如:

PCollection<String> data = myData.apply("Get data",
    ParDo.of(new MyClass()).withSideInputs(myDataView));
Run Code Online (Sandbox Code Playgroud)

我有一节课: -

static class MyClass extends DoFn<String,String>
{
    //How to access side input here
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,c.sideInput()不起作用.

谢谢.

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
774
查看次数

检查 PCollection 是否为空 - Apache Beam

有没有办法检查 PCollection 是否为空?

我在 Dataflow 和 Apache Beam 的文档中没有找到任何相关内容。

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
5779
查看次数

使用 Apache Beam 将流数据写入 GCS

如何使用 Apache Beam 中的 TextIO 将从 PubSub 收到的消息写入 GCS 中的文本文件?看到了一些方法,如 withWindowedWrites() 和 withFilenamePolicy() 但在文档中找不到任何示例。

google-cloud-storage google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
4187
查看次数

在执行BigQueryIO.write()时获取/设置BigQuery作业ID

是否可以设置BigQuery JobID或在批处理管道运行时获取它.
我知道使用BigQuery API是可能的,但如果我使用Apache Beam的BigQueryIO,它是否可能?我需要在写完BigQuery后发送确认信息表明加载完成了.

google-bigquery google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
229
查看次数