我使用以下代码执行管道:
PCollection<TableRow> test1 = ...
test1
.apply(BigQueryIO.Write
.named("test1 write")
.to("project_name:dataset_name.test1")
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PCollection<TableRow> test2 = ...
test2
.apply(BigQueryIO.Write
.named("test2 write")
.to("project_name:dataset_name.test2")
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Run Code Online (Sandbox Code Playgroud)
如果我执行管道并且表"test1"和"test2"都不存在,我将获得以下信息:
jun 09, 2015 12:29:24 PM com.google.cloud.dataflow.sdk.util.BigQueryTableInserter tryCreateTable
INFORMACIÓN: Trying to create BigQuery table: project_name:dataset_name.test1
jun 09, 2015 12:29:27 PM com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer$LoggingHttpBackoffUnsuccessfulResponseHandler handleResponse
ADVERTENCIA: Request failed with code 404, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/pragmatic-armor-455/datasets/audit/tables/project_name:dataset_name.test2/insertAll
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : …Run Code Online (Sandbox Code Playgroud) 我们正在实现一个相当复杂的管道,该管道由链接在一起的多个GroupBy和Combine组成.除此之外,管道还应用了KeyedPCollectionTuple.
此管道已成功执行,但图表未显示在Google开发人员的控制台中.仅显示日志.这些步骤也缺失了.
有没有办法让他们展示?
我想在perKey的基础上迭代KV pCollection的值.我使用下面的代码来组合使用自定义类,
PCollection<KV<String, String>> combinesAttributes =
valExtract.get(extAttUsers).apply(Combine.<String, String>perKey(
new CombineAttributes()));
Run Code Online (Sandbox Code Playgroud)
以下是我的自定义组合课程,
public static class CombineAttributes implements SerializableFunction<Iterable<String>, String> {
@Override
public String apply(Iterable<String> input) {...}..}
Run Code Online (Sandbox Code Playgroud)
这适用于小输入,但对于大输入,联合收割机不如预期.输出只合并了一些键的值,其他的则丢失了.我假设输出只包含来自一个节点的数据.
https://cloud.google.com/dataflow/model/combine中的文档提及使用CombineFn以便在所有节点中组合每个键的完整值集合.
但是,当我更改下面的自定义组合功能时,我收到以下错误,
incompatible types: CombineAttributes cannot be converted to com.google.cloud.dataflow.sdk.transforms.SerializableFunction<java.lang.Iterable<java.lang.String>,java.lang.String>
Run Code Online (Sandbox Code Playgroud)
结合功能
public static class CombineAttributes extends CombineFn<Iterable<String>, CombineAttributes.Accum, String> {
public static class Accum {
List<String> inputList = new ArrayList<String>();
}
public Accum createAccumulator() { return new Accum(); }
public Accum addInput(Accum accum, Iterable<String> input) {
for (String item : input) {
accum.inputList.add(item); …Run Code Online (Sandbox Code Playgroud) 我们正在Google Cloud上建立机器学习管道,利用GC ML-Engine进行分布式TensorFlow培训和模型服务,并利用DataFlow进行分布式预处理作业.
我们希望在Google Cloud上运行我们的Apache Beam应用程序作为DataFlow作业.看看ML-Engine样本 ,似乎可以得到tensorflow_transform.beam.impl AnalyzeAndTransformDataset来指定使用哪个PipelineRunner,如下所示:
from tensorflow_transform.beam import impl as tft
pipeline_name = "DirectRunner"
p = beam.Pipeline(pipeline_name)
p | "xxx" >> xxx | "yyy" >> yyy | tft.AnalyzeAndTransformDataset(...)
Run Code Online (Sandbox Code Playgroud)
TemplatingDataflowPipelineRunner提供了将预处理开发与参数化操作分开的功能 - 请参阅此处:https://cloud.google.com/dataflow/docs/templates/overview-基本上:
现在的问题是:你能告诉我,我们怎能用tf.Transform利用TemplatingDataflowPipelineRunner?
google-cloud-dataflow tensorflow apache-beam google-cloud-ml google-cloud-ml-engine
DataFlow本身有ETL,计算和流媒体处理为什么我们需要去谷歌的Dataproc?
如何使用 Apache Beam 中的 TextIO 将从 PubSub 收到的消息写入 GCS 中的文本文件?看到了一些方法,如 withWindowedWrites() 和 withFilenamePolicy() 但在文档中找不到任何示例。
假设我们有一个有 4 个 CPU 内核的工人。Dataflow 工作机器中的并行性如何配置?我们是否并行超过内核数量?
在哪里可以获得此类信息?
当您需要从数据流作业中的bigquery中的一个或多个表中读取所有数据时,我会说有两种方法.第一种方法是使用BigQueryIOwith from,它读取有问题的表,第二种方法是使用fromQuery指定读取同一个表中所有数据的查询的位置.所以我的问题是:
我没有在文档中找到任何关于此的内容,但我真的很想知道.我想可能read更快,因为您不需要运行扫描数据的查询,这意味着它更类似于您在BigQueryUI中的预览功能.如果这是真的,它也可能便宜得多,但如果它们的成本相同则有意义.
简而言之,有什么区别:
BigQueryIO.read(...).from(tableName)
Run Code Online (Sandbox Code Playgroud)
和
BigQueryIO.read(...).fromQuery("SELECT * FROM " + tableName)
Run Code Online (Sandbox Code Playgroud) 这是我的输入文件的样子:
{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}
{"Id": 2, "Address": {"City":"Mumbai"}}
{"Id": 3, "Address": {"Street":"XYZ Road"}}
{"Id": 4}
{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}
Run Code Online (Sandbox Code Playgroud)
在我的数据流管道中,我如何动态确定每行中存在哪些字段以符合 BigQuery 表架构。例如,在第 2 行中,Street丢失了。我希望Address.StreetBigQuery 中的列条目为"N/A"ornull并且不希望管道因架构更改或丢失数据而失败。
在使用 Python 写入 BigQuery 之前,如何在数据流作业中处理此逻辑?
python google-bigquery google-cloud-platform google-cloud-dataflow google-cloud-functions
由于文档仅适用于JAVA,我无法理解其含义.
它声明 - "虽然ParDo总是产生一个主输出PCollection(作为应用的返回值),你也可以让你的ParDo产生任意数量的额外输出PCollections.如果你选择有多个输出,你的ParDo将返回所有的输出PCollections(包括主输出)捆绑在一起.例如,在Java中,输出PCollections捆绑在一个类型安全的PCollectionTuple中."
我理解捆绑在一起意味着什么,但如果我在我的DoFn中产生一个标签,它是否会产生一个包含所有其他输出的空包,并在代码中遇到它们时产生其他输出?或者它等待所有产量准备好输入并将它们全部输出到一起?
文档中没有太多清晰度.虽然我认为它不会等待,只是遇到收益,但我仍然需要了解发生了什么.