BigQuery-时间序列和选择“最新”记录的最有效方法

Kur*_*ile 3 google-bigquery

在我们的BQ设计中,我们有一个客户表(嵌套原始数据),该表是从我们的微服务层(消耗运动力蒸汽)中获取的事件,其中每个事件都有该事件所针对实体的最新实体快照(在更改)。我猜是一种现代的变更数据捕获。

每个事件中的最新快照都是我们填充BigQuery的方式-它将其提取并以APPEND ONLY模式加载(通过apache spark结构化连接器连接)到biqQuery中。(这与更改和更新给定ID的一行不同)

因此,仅在追加的情况下,此表的大小当然会随着时间的推移而增长-事件中每次更改的条目。但是,它很好地是一个完整的,按时间顺序排列的客户状态和变更(我们需要具备)的序列,因此是不变的。我们可以通过重播事件来重建整个仓库,例如……足够的上下文。

这样做的一个后果是,将事实加载到BigQuery中可能会导致重复(例如,如果发生火花错误并重试了一个微型批处理,则按作业加载时BQ并不是幂等的结构化流宿,或者仅仅是由于其通常可能的分布式性质)。SteamingInserts可能会在以后研究,因为它有助于重复数据删除.....

这种体系结构的结果是我需要原始时间序列数据(记住有时可能有重复项)的视图,这些视图在这些条件下会返回LATEST记录。

最新时间由客户记录(metadata.lastUpdated)上的元数据结构字段确定-带有MAX(metadata.lastUpdated)的行是最新的。这是我们的MS层所保证的。

这也是一个真实的事件时间时间戳。表ID DAY已分区,并具有_PARTITIONTIME列,但这仅是摄取时间,我无法使用它。当我可以指定要用作分区时间的列时很棒!(愿望清单)。

重复的将是具有相同客户'id'和'metadata.lastUpdated'的两行-因此MAX(metadata.lastUpdated)可能返回2行,因此我需要使用

ROW_NUMBER()OVER(PARTITION BY ....所以我可以选择rowNum = 1

在我看来,也只选择有凹痕的1行。

好的,足够多的单词/上下文(对不起),下面是我的SQL以获得最新的视图。它可以通过我的测试运行,但是当表的大小/行数变大时,我不确定这是实现结果的最有效方法,并且想知道是否有任何BigQuery棺材可能更有效/更聪明SQL要这样做吗?为什么说SQL没问题,但绝不是性能调优的专家,可以肯定,尤其是为BQ性能调优做SQL的最佳方法。

我只是希望能够将所有数据存储在一个表中,并依靠dremel引擎的功能来查询它,而不是需要多个表或进行任何过于复杂的操作。

所以我的SQL如下。注意-我的时间戳记是作为字符串提取的,因此也需要在视图中对其进行解析。

WITH
  cus_latest_watermark AS (
  SELECT
    id,
    MAX(PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", metadata.lastUpdated)) AS maxLastUpdatedTimestampUTC
  FROM
    `project.dataset.customer_refdata`
  GROUP BY
    id ),
  cust_latest_rec_dup AS (
  SELECT
    cus.*,
    ROW_NUMBER() OVER (PARTITION BY cus.id ORDER BY cus.id) AS rowNum
  FROM
    `project.dataset.customer_refdata` cus
  JOIN
    cus_latest_watermark
  ON
    cus.id = cus_latest_watermark.id
    AND PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", cus.metadata.lastUpdated) = cus_latest_watermark.maxLastUpdatedTimestampUTC)
SELECT
  cust_latest_rec_dup.* EXCEPT(rowNum)
FROM
  cust_latest_rec_dup
WHERE
  rowNum = 1
Run Code Online (Sandbox Code Playgroud)

谢谢!

Fel*_*ffa 7

我是Mikhail的忠实拥护者,并且很长一段时间以来我们一直在进行诸如OVER(ORDER)之类的查询-但是让我提出一个替代方法,因为#standardSQL使其成为可能。

该查询失败,因为它在单个分区中有太多的元素需要ORDER BY:

#standardSQL
SELECT *
FROM (
  SELECT repo.name, type, actor.id as actor, payload, created_at
    , ROW_NUMBER() OVER(PARTITION BY actor.id ORDER BY created_at DESC) rn
  FROM `githubarchive.month.201706` 
)
WHERE rn=1
ORDER BY created_at
LIMIT 100


"Error: Resources exceeded during query execution."
Run Code Online (Sandbox Code Playgroud)

同时,此查询将在15秒内运行:

#standardSQL
SELECT actor, event
FROM (
  SELECT actor.id actor, 
    ARRAY_AGG(
      STRUCT(type, repo.name, payload, created_at) 
      ORDER BY created_at DESC LIMIT 1
    ) events
  FROM `githubarchive.month.201706` 
  GROUP BY 1
), UNNEST(events) event
ORDER BY event.created_at
LIMIT 100
Run Code Online (Sandbox Code Playgroud)

这是因为允许ORDER BY删除每个GROUP BY上的所有内容(除了最高记录以外)。

如果要所有记录,则相当于SELECT *(感谢Elliott):

#standardSQL
SELECT event.* FROM (
  SELECT ARRAY_AGG(
    t ORDER BY t.created_at DESC LIMIT 1
  )[OFFSET(0)]  event
  FROM `githubarchive.month.201706` t 
  GROUP BY actor.id
)
ORDER BY created_at
LIMIT 100
Run Code Online (Sandbox Code Playgroud)

  • 如果要将结构中的所有字段都包含在“ ARRAY_AGG”中,则可以在为表赋予别名“ t”之后使用“ ARRAY_AGG((SELECT AS STRUCT t。*)...”)。 (2认同)

Mik*_*ant 5

请尝试以下 BigQuery 标准 SQL

#standardSQL
WITH cus_watermark AS (
  SELECT
    *,
    PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", metadata.lastUpdated) AS UpdatedTimestampUTC
  FROM `project.dataset.customer_refdata`
),
cust_latest_rec_dup AS (
  SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY UpdatedTimestampUTC DESC) AS rowNum
  FROM cus_watermark
)
SELECT * EXCEPT(rowNum)
FROM cust_latest_rec_dup
WHERE rowNum = 1  
Run Code Online (Sandbox Code Playgroud)

您可以使用以下虚拟数据来播放/测试此方法

#standardSQL
WITH `project.dataset.customer_refdata` AS (
  SELECT 1 AS id, '2017-07-14 16:47:27' AS lastUpdated UNION ALL
  SELECT 1, '2017-07-14 16:47:27' UNION ALL
  SELECT 1, '2017-07-14 17:47:27' UNION ALL
  SELECT 1, '2017-07-14 18:47:27' UNION ALL
  SELECT 2, '2017-07-14 16:57:27' UNION ALL
  SELECT 2, '2017-07-14 17:57:27' UNION ALL
  SELECT 2, '2017-07-14 18:57:27' 
),
cus_watermark AS (
  SELECT
    *,
    PARSE_TIMESTAMP("%Y-%m-%d %T", lastUpdated) AS UpdatedTimestampUTC
  FROM `project.dataset.customer_refdata`
),
cust_latest_rec_dup AS (
  SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY UpdatedTimestampUTC DESC) AS rowNum
  FROM cus_watermark
)
SELECT * EXCEPT(rowNum)
FROM cust_latest_rec_dup
WHERE rowNum = 1
Run Code Online (Sandbox Code Playgroud)