例如,如果我有一个从PubSub读取的5分钟窗口的数据流流媒体作业,我理解如果我将一个超过两天的时间戳分配给一个元素,那么将有一个带有此元素的窗口,如果我使用的示例是将每日表输出到BigQueryIO.java中描述的BigQuery,该作业将使用实际日期在BigQuery表中写入过去两天的元素.
我想将过去的元素写入BigQuery表,并使用窗口元素的时间戳而不是当前窗口的时间,是否可能?
现在我按照DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java中描述的示例:
PCollection<TableRow> quotes = ...
quotes.apply(Window.<TableRow>info(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
((DaysWindow) window).getStartDate());
return "my-project:output.output_table_" + dayString;
}
}));
Run Code Online (Sandbox Code Playgroud) 我正在阅读“ 使用自定义机器创建实例”,这看起来确实很有趣,但是我不知道如何使用自定义机器(例如8个CPU和15GB)来设置Dataflow流作业。可能吗?
我对使用“group_concat”BigQuery 函数执行查询很感兴趣。当我直接在 BigQuery 界面中执行查询时,查询成功结束,但是当我尝试通过 Node js 执行该查询时,出现了以下错误:
errors:
[ { domain: 'global',
reason: 'invalidQuery',
message: 'Function not found: group_concat at [4:3]',
locationType: 'other',
location: 'query' } ]
Run Code Online (Sandbox Code Playgroud)
代码不是问题,因为它执行一个简单的查询没有任何问题。
我的查询:
SELECT
st_hub_session_id,
num_requests,
group_concat( group.code, '|' ) as Codes
FROM
table.name
GROUP BY
st_hub_session_id,
group_concat
LIMIT
1000
Run Code Online (Sandbox Code Playgroud)
问题可能出在哪里?
我使用以下代码执行管道:
PCollection<TableRow> test1 = ...
test1
.apply(BigQueryIO.Write
.named("test1 write")
.to("project_name:dataset_name.test1")
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PCollection<TableRow> test2 = ...
test2
.apply(BigQueryIO.Write
.named("test2 write")
.to("project_name:dataset_name.test2")
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Run Code Online (Sandbox Code Playgroud)
如果我执行管道并且表"test1"和"test2"都不存在,我将获得以下信息:
jun 09, 2015 12:29:24 PM com.google.cloud.dataflow.sdk.util.BigQueryTableInserter tryCreateTable
INFORMACIÓN: Trying to create BigQuery table: project_name:dataset_name.test1
jun 09, 2015 12:29:27 PM com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer$LoggingHttpBackoffUnsuccessfulResponseHandler handleResponse
ADVERTENCIA: Request failed with code 404, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/pragmatic-armor-455/datasets/audit/tables/project_name:dataset_name.test2/insertAll
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : …
Run Code Online (Sandbox Code Playgroud)