窗格和窗户有什么区别?传入的元素被分组到窗口中。那么窗格包含什么呢?
我从 Beam 文档中获取了以下代码
.of(new DoFn<String, String>() {
public void processElement(@Element String word, PaneInfo paneInfo) {
}})
Run Code Online (Sandbox Code Playgroud)
每个元素都属于一个窗格吗?还是多个窗格?需要一个简单的类比来理解窗格和窗口
使用BigQueryIO时如何选择这2个写入函数。
找到了
官方文档
这里提到writeTableRows()不推荐,但不明白为什么。
gson序列化我的 Java POJO并直接TableRow写入writeTableRows()TableRow如果我使用,我需要手动设置对象write()问题:
write()喜欢和选择的理由是什么谢谢
我是Google Cloud Platform的新手,我第一次尝试使用Google Dataflow来完成我的研究生课程项目.我想要做的是编写一个自动加载作业,从我的云存储中的某个存储桶加载文件,并将其中的数据插入到BigQuery表中.
我将数据作为PCollection<String>类型获取,但是为了插入BigQuery,我显然需要将其转换为PCollection<TableRow>类型.到目前为止,我还没有找到一个可靠的答案.
这是我的代码:
public static void main(String[] args) {
//Defining the schema of the BigQuery table
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("Datetime").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("Consumption").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("MeterID").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
//Creating the pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
//Getting the data from cloud storage
PCollection<String> lines = p.apply(TextIO.Read.named("ReadCSVFromCloudStorage").from("gs://mybucket/myfolder/certainCSVfile.csv"));
//Probably need to do some transform here ...
//Inserting data into BigQuery
lines.apply(BigQueryIO.Write
.named("WriteToBigQuery")
.to("projectID:datasetID:tableID")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
}
Run Code Online (Sandbox Code Playgroud)
我可能只是忘记了一些基本的东西,所以我希望你们可以帮助我...
java google-cloud-storage google-bigquery google-cloud-dataflow
我正在将数据从Google数据流推送到Google BigQuery。我有TableRow数据对象。TableRow中的一列确实包含字符串数组。
从这里开始,我发现Google BigQuery支持Array列类型。所以我试图用ARRAY<SCHEMA>as类型创建表。但是我收到以下错误
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "Invalid value for: ARRAY<STRING> is not a valid value",
"reason" : "invalid"
} ],
"message" : "Invalid value for: ARRAY<STRING> is not a valid value"
}
com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:47)
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:369)
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:162)
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:194)
com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47)
Run Code Online (Sandbox Code Playgroud)
这是我用来将值发布到BigQuery中的代码
.apply(BigQueryIO.Write.named("Write enriched data")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.to("table_name"));
Run Code Online (Sandbox Code Playgroud)
这是架构构造
private static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("column1").setType("STRING"));
fields.add(new …Run Code Online (Sandbox Code Playgroud) 我使用Google Dataflow(apache-beam)设置了一个小测试.该实验的用例是获取(csv)文件并将选定列写入(txt)文件.
实验代码如下:
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class EmitColDoFn(beam.DoFn):
first = True
header = ""
def __init__(self, i):
super(EmitColDoFn, self).__init__()
self.line_count = Metrics.counter(self.__class__, 'lines')
self.i = i
def process(self, element):
if self.first:
self.header = element
self.first = False
else:
self.line_count.inc()
cols = re.split(',', element)
return (cols[self.i],) …Run Code Online (Sandbox Code Playgroud) python-2.7 google-cloud-platform google-cloud-dataflow apache-beam gcp
是否可以设置BigQuery JobID或在批处理管道运行时获取它.
我知道使用BigQuery API是可能的,但如果我使用Apache Beam的BigQueryIO,它是否可能?我需要在写完BigQuery后发送确认信息表明加载完成了.
我正在尝试使用Apache Beam在Dataflow中编写作业.此作业需要获取输入并将其转换为我的自定义对象.此对象表示内存测试,其中包含固定属性,如timestamp,name ...以及具有其属性的分区列表
public class TestResult {
String testName;
String testId;
String testStatus;
String testResult;
List<Partition> testPartitions;
}
public class Partition {
String testId;
String filesystem;
String mountedOn;
String usePercentage;
String available;
String size;
String used;
}
Run Code Online (Sandbox Code Playgroud)
我的最后一个转换,获取此TestResult对象并将其转换为表行.
static class TestResultToRowConverter extends DoFn<TestResult, TableRow> {
/**
* In this example, put the whole string into single BigQuery field.
*/
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("setting TestResult-> TestResult:" + c.element());
c.output(new TableRow().set("testName", c.element().testName).set("testId", c.element().testId).set("testStatus", c.element().testStatus).set("testResult", c.element().testResult).set("memoryTestData", "example data test"));
for …Run Code Online (Sandbox Code Playgroud) 我通过REST请求调用Pub/Sub.我正在尝试将列化数据放在Pub/Sub上的主题上,然后进入DataFlow,最后进入Big Query,其中定义了Table.
这是所述JSON数据的布局:
[
{
"age": "58",
"job": "management",
"marital": "married",
"education": "tertiary",
"default": "no",
"balance": "2143",
"housing": "yes",
"loan": "no",
"contact": "unknown",
"day": "5",
"month": "may",
"duration": "261",
"campaign": "1",
"pdays": "-1",
"previous": "0",
"poutcome": "unknown",
"y": "no"
}
]
Run Code Online (Sandbox Code Playgroud)
现在,为了形成正确的JSON主体,需要进入以下Pub/Sub识别请求:
{
"messages": [{
"attributes": {
"key": "iana.org/language_tag",
"value": "en"
},
"data": "%DATA%"
}]
}
Run Code Online (Sandbox Code Playgroud)
现在,Pub/Sub REST引用声明需要将"Data"字段转换为Base64,这就是我所做的,最终的JSON格式如下所示(%DATA%被原始消息的Base64转换替换)数据)
{
"messages": [{
"attributes": {
"key": "iana.org/language_tag",
"value": "en"
},
"data": "Ww0KICB7DQogICAgImFnZSI6ICI1OCIsDQogICAgImpvYiI6ICJtYW5hZ2VtZW50IiwNCiAgICAibWFyaXRhbCI6ICJtYXJyaWVkIiwNCiAgICAiZWR1Y2F0aW9uIjogInRlcnRpYXJ5IiwNCiAgICAiZGVmYXVsdCI6ICJubyIsDQogICAgImJhbGFuY2UiOiAiMjE0MyIsDQogICAgImhvdXNpbmciOiAieWVzIiwNCiAgICAibG9hbiI6ICJubyIsDQogICAgImNvbnRhY3QiOiAidW5rbm93biIsDQogICAgImRheSI6ICI1IiwNCiAgICAibW9udGgiOiAibWF5IiwNCiAgICAiZHVyYXRpb24iOiAiMjYxIiwNCiAgICAiY2FtcGFpZ24iOiAiMSIsDQogICAgInBkYXlzIjogIi0xIiwNCiAgICAicHJldmlvdXMiOiAiMCIsDQogICAgInBvdXRjb21lIjogInVua25vd24iLA0KICAgICJ5IjogIm5vIg0KICAgIH0NCl0="
}]
}
Run Code Online (Sandbox Code Playgroud)
Pub/Sub允许这些数据,然后将其放入DataFlow,但这就是一切都中断的地方.DataFlow尝试反序列化信息,但失败并显示以下消息:
(efdf538fc01f50b0): java.lang.RuntimeException: Unable to parse input
com.google.cloud.teleport.templates.common.BigQueryConverters$JsonToTableRow$1.apply(BigQueryConverters.java:58) …Run Code Online (Sandbox Code Playgroud) json google-bigquery google-cloud-platform google-cloud-pubsub google-cloud-dataflow
我正在尝试优化从PubSubIO中提取消息的管道,并将这些消息发送到第三方API.我有一个有趣的观察结果是,如果我在之后放置一个GroupBy"Degroup"变换PubSubIO.read,那么管道的吞吐量会显着增加.我添加了GroupByjust以防止融合优化,现在我想知道在给定的管道中如何合并转换.
融合后如何找出管道的最佳方法是什么?
在GCP中,我有一个数据流作业,可以将文件从云存储复制到大查询.我想在这些文件成功插入大查询后删除它们.有人可以提供有关如何实现这一点的指示,以及如何在上一个工作成功后触发另一个工作?
google-cloud-storage google-cloud-platform google-cloud-dataflow