我注意到的一件事是,BigQueryIO.read().fromQuery()的性能比Apache Beam中的BigQueryIO.read().from()的性能要慢得多.为什么会这样?有没有办法改善它?
我最近看到 GCP 中有一个名为 Data Fusion 的新工具,在查看它时,与 Dataflow 相比,它似乎是创建 ETL 管道的一种更简单的方法。那么我们可以假设它是 Dataflow 的替代品吗?
有没有办法将侧输入应用于 Apache Beam 中的 BigQueryIO.read() 操作。
举例来说,我在 PCollection 中有一个值,我想在查询中使用它来从 BigQuery 表中获取数据。这可以使用侧面输入吗?或者在这种情况下应该使用其他东西吗?
我在类似的情况下使用了 NestedValueProvider,但我想我们只能在某个值取决于我的运行时值时使用它。或者我可以在这里使用相同的东西吗?如果我错了,请纠正我。
我试过的代码:
Bigquery bigQueryClient = start_pipeline.newBigQueryClient(options.as(BigQueryOptions.class)).build();
Tabledata tableRequest = bigQueryClient.tabledata();
PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new DoFn<String,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
List<TableRow> list = c.sideInput(bqDataView);
String tableName = list.get(0).get("table").toString();
TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();
for(TableRow row:table.getRows())
{
c.output(row);
}
}
}).withSideInputs(bqDataView));
Run Code Online (Sandbox Code Playgroud)
我得到的错误是:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize BeamTest.StarterPipeline$1@86b455
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
at BeamTest.StarterPipeline.main(StarterPipeline.java:158)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.Bigquery$Tabledata …Run Code Online (Sandbox Code Playgroud) 我需要从GCS存储桶中读取文件.我知道我将不得不使用GCS API /客户端库,但我找不到任何与之相关的示例.
我一直在参考GCS文档中的这个链接: GCS客户端库.但真的不能成功.如果有人能提供一个真正有用的例子.谢谢.
我有一个用例,需要将70 TB的数据从DynamoDB迁移到BigTable和Spanner。具有单个索引的表将进入BigTable,否则将进入Spanner。
通过将数据导出到S3-> GCS-> Spanner / BigTable,可以轻松处理历史负载。但是具有挑战性的部分是要处理DynamoDB上同时发生的增量流负载。DynamoDB中有300个表。
如何以最好的方式处理这件事?有人做过吗?
amazon-dynamodb google-cloud-dataflow google-cloud-bigtable apache-beam google-cloud-spanner
Dataproc尚未与Apache Ranger和Apache Sentry集成在一起。那么在Hive中推荐的用户授权方式是什么?
我是Dataproc的新手,您的回答确实会有所帮助。
我正在尝试通过 gcloud 命令运行数据流作业:
gcloud beta dataflow jobs run test --gcs-location gs://bucket/templates/templateName --parameters query="select a.name,b.salary,a.id from table1 a join table2 b on a.id = b.id"
Run Code Online (Sandbox Code Playgroud)
但我收到一条错误消息:
错误:(gcloud.beta.dataflow.jobs.run) 参数 --parameters:dict arg 的错误语法:[b.salary]。请查看gcloud topic escaping您是否需要有关转义列表或字典标志值的信息。
我看到了 gcloud 主题转义的文档,但无法弄清楚如何在此处应用它。有人可以帮我解决这个问题。
谢谢。
如果我的类扩展了DoFn,如何访问侧输入的元素?
例如:
假设我有一个ParDo变换,如:
PCollection<String> data = myData.apply("Get data",
ParDo.of(new MyClass()).withSideInputs(myDataView));
Run Code Online (Sandbox Code Playgroud)
我有一节课: -
static class MyClass extends DoFn<String,String>
{
//How to access side input here
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,c.sideInput()不起作用.
谢谢.
有没有办法检查 PCollection 是否为空?
我在 Dataflow 和 Apache Beam 的文档中没有找到任何相关内容。
如何使用 Apache Beam 中的 TextIO 将从 PubSub 收到的消息写入 GCS 中的文本文件?看到了一些方法,如 withWindowedWrites() 和 withFilenamePolicy() 但在文档中找不到任何示例。
是否可以设置BigQuery JobID或在批处理管道运行时获取它.
我知道使用BigQuery API是可能的,但如果我使用Apache Beam的BigQueryIO,它是否可能?我需要在写完BigQuery后发送确认信息表明加载完成了.