Mat*_*eus 0 java google-cloud-storage google-bigquery google-cloud-dataflow
我是Google Cloud Platform的新手,我第一次尝试使用Google Dataflow来完成我的研究生课程项目.我想要做的是编写一个自动加载作业,从我的云存储中的某个存储桶加载文件,并将其中的数据插入到BigQuery表中.
我将数据作为PCollection<String>
类型获取,但是为了插入BigQuery,我显然需要将其转换为PCollection<TableRow>
类型.到目前为止,我还没有找到一个可靠的答案.
这是我的代码:
public static void main(String[] args) {
//Defining the schema of the BigQuery table
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("Datetime").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("Consumption").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("MeterID").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
//Creating the pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
//Getting the data from cloud storage
PCollection<String> lines = p.apply(TextIO.Read.named("ReadCSVFromCloudStorage").from("gs://mybucket/myfolder/certainCSVfile.csv"));
//Probably need to do some transform here ...
//Inserting data into BigQuery
lines.apply(BigQueryIO.Write
.named("WriteToBigQuery")
.to("projectID:datasetID:tableID")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
}
Run Code Online (Sandbox Code Playgroud)
我可能只是忘记了一些基本的东西,所以我希望你们可以帮助我...
BigQueryIO.Write
按照写给BigQuery的PCollection<TableRow>
概述进行操作.您需要应用转换才能转换为.举个例子,看一下StringToRowConverter:PCollection<TableRow>
PCollection<String>
static class StringToRowConverter extends DoFn<String, TableRow> {
/**
* In this example, put the whole string into single BigQuery field.
*/
@Override
public void processElement(ProcessContext c) {
c.output(new TableRow().set("string_field", c.element()));
}
...
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1790 次 |
最近记录: |