小编bsm*_*osj的帖子

是否可以根据窗口元素的时间戳动态生成BigQuery表名?

例如,如果我有一个从PubSub读取的5分钟窗口的数据流流媒体作业,我理解如果我将一个超过两天的时间戳分配给一个元素,那么将有一个带有此元素的窗口,如果我使用的示例是将每日表输出到BigQueryIO.java中描述的BigQuery,该作业将使用实际日期在BigQuery表中写入过去两天的元素.

我想将过去的元素写入BigQuery表,并使用窗口元素的时间戳而不是当前窗口的时间,是否可能?

现在我按照DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java中描述的示例:

    PCollection<TableRow> quotes = ...
    quotes.apply(Window.<TableRow>info(CalendarWindows.days(1)))
       .apply(BigQueryIO.Write
         .named("Write")
         .withSchema(schema)
         .to(new SerializableFunction<BoundedWindow, String>() {
               public String apply(BoundedWindow window) {
                 String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
                   ((DaysWindow) window).getStartDate());
                 return "my-project:output.output_table_" + dayString;
               }
             }));
Run Code Online (Sandbox Code Playgroud)

java google-bigquery google-cloud-dataflow

5
推荐指数
1
解决办法
949
查看次数

是否可以对数据流实例使用自定义计算机?

我正在阅读“ 使用自定义机器创建实例”,这看起来确实很有趣,但是我不知道如何使用自定义机器(例如8个CPU和15GB)来设置Dataflow流作业。可能吗?

google-cloud-dataflow

5
推荐指数
1
解决办法
393
查看次数

未找到函数:group_concat

我对使用“group_concat”BigQuery 函数执行查询很感兴趣。当我直接在 BigQuery 界面中执行查询时,查询成功结束,但是当我尝试通过 Node js 执行该查询时,出现了以下错误:

errors:
   [ { domain: 'global',
       reason: 'invalidQuery',
       message: 'Function not found: group_concat at [4:3]',
       locationType: 'other',
       location: 'query' } ]
Run Code Online (Sandbox Code Playgroud)

代码不是问题,因为它执行一个简单的查询没有任何问题。

我的查询:

SELECT
  st_hub_session_id,
  num_requests,
  group_concat( group.code, '|' ) as Codes
FROM
  table.name
GROUP BY
  st_hub_session_id,
  group_concat
LIMIT
  1000
Run Code Online (Sandbox Code Playgroud)

问题可能出在哪里?

google-bigquery google-api-nodejs-client

4
推荐指数
1
解决办法
4661
查看次数

当我尝试在同一个管道执行时创建不同的BigQuery表时出错

我使用以下代码执行管道:

PCollection<TableRow> test1 = ...
test1
    .apply(BigQueryIO.Write
        .named("test1 write")
        .to("project_name:dataset_name.test1")
        .withSchema(tableSchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

PCollection<TableRow> test2 = ...
test2
    .apply(BigQueryIO.Write
        .named("test2 write")
        .to("project_name:dataset_name.test2")
        .withSchema(tableSchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Run Code Online (Sandbox Code Playgroud)

如果我执行管道并且表"test1"和"test2"都不存在,我将获得以下信息:

jun 09, 2015 12:29:24 PM com.google.cloud.dataflow.sdk.util.BigQueryTableInserter tryCreateTable
INFORMACIÓN: Trying to create BigQuery table: project_name:dataset_name.test1
jun 09, 2015 12:29:27 PM com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer$LoggingHttpBackoffUnsuccessfulResponseHandler handleResponse
ADVERTENCIA: Request failed with code 404, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/pragmatic-armor-455/datasets/audit/tables/project_name:dataset_name.test2/insertAll
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : …
Run Code Online (Sandbox Code Playgroud)

google-bigquery google-cloud-dataflow

1
推荐指数
1
解决办法
325
查看次数