如果我们试图使数据适合数据仓库模型,如何在雪花合并语句中的 WHEN MATCHED THEN 上插入一行?

ali*_*990 1 sql merge snowflake-cloud-data-platform data-vault

我正在将数据加载到雪花数据仓库建模数据库中。当行的字段被更新时,模型的工作方式如下:

  1. 将此行的加载结束日期设置为等于current_timestamp()
  2. 使用新值再次将同一行添加到模型中。

merge在 JavaScript 过程中使用 Snowflake 中的命令来执行此操作:

var observarion_query = "MERGE INTO HUB_OBSERVATION AS OBS "+
"USING (SELECT DATE(T.$"+OBSERVATION_DATE+", 'DD/MM/YYYY') AS OBS_DATE, T.$"+LOCATIONS+", T.$"+SUBMISSION_TIME+" FROM "+FILE_FULL_PATH+"(FILE_FORMAT=>"+FILE_FORMAT_NAME+") T) ST "+
"ON md5(CONCAT(ST.OBS_DATE, CONCAT('CAMP', CONCAT(ST.$"+LOCATION_POSITION+", ST.$"+SUBMISSION_TIME+")))) = OBS.OBSERVATION_DATE_LOCATION_HASH_KEY "+
"WHEN MATCHED THEN UPDATE SET OBS.LOAD_END_DT = CURRENT_TIMESTAMP() "+
"WHEN NOT MATCHED THEN "+
"INSERT (OBSERVATION_DATE_LOCATION_HASH_KEY, LOAD_DT, LOAD_END_DT, RECORD_SRC, OBSERVATION_DATE, LOCATION_NAME) "+
"VALUES (md5(CONCAT(ST.OBS_DATE, CONCAT('CAMP', CONCAT(ST.$"+LOCATION_POSITION+", ST.$"+SUBMISSION_TIME+")))), current_timestamp(), NULL, 'ONA', ST.OBS_DATE, CONCAT('CAMP', ST.$"+LOCATION_POSITION+")) ";
Run Code Online (Sandbox Code Playgroud)

问题在于WHEN MATCHED THEN,我需要使用新值对同一行进行插入,但有额外的条件:

WHEN MATCHED and OBS.REVIEW_STATUS <> ST.REVIEW_STATUS THEN
// INSERT THE ROW
Run Code Online (Sandbox Code Playgroud)

我确实知道我们无法在WHEN MATCHED THEN语句内执行插入查询。

我们怎样才能找到转机来做到这一点?

Dar*_*ner 5

如果我正确理解您的问题,您希望单个源行可能导致针对目标表的 2 个操作:

  • 将 LOAD_END_DT 更新为当前时间戳(如果该键已存在于目标表中且 LOAD_END_DT = NULL,则表示它是“当前”)
  • 将包含最新信息的新行插入到目标表中

您可以通过使用 UNION ALL 将 USING 子句内的每个源行“拆分”为 2 行来实现此目的:一行用于 UPDATE,一行用于 INSERT。我通常包含一个布尔标志来区分它们(因为它们是重复的)。在 INSERT 部分中,我执行SELECT ... WHERE NOT EXISTS (SELECT 1 FROM target WHERE key = key and MD5() = MD5())以便仅在新行与之前的行不同时才插入新行当前行。我的 ON 子句对代表 UPDATE 场景的布尔值有一个过滤器。

[编辑以包含示例合并]

首先,假设以下阶段和决赛表定义:

CREATE OR REPLACE TRANSIENT TABLE T_STAGE (
  ID            INTEGER
 ,COL1          VARCHAR
 ,COL2          VARCHAR
 ,COL3          VARCHAR
)
;
CREATE OR REPLACE TRANSIENT TABLE T_FINAL (
  ID            INTEGER
 ,START_TS      TIMESTAMP_LTZ
 ,END_TS        TIMESTAMP_LTZ
 ,COL1          VARCHAR
 ,COL2          VARCHAR
 ,COL3          VARCHAR
 ,COL_MD5_HASH  VARCHAR
)
;
Run Code Online (Sandbox Code Playgroud)

此 MERGE 说明了如何使用 UNION ALL 将单个源行拆分为 2 个,以便可以对目标表应用 INSERT 和 UPDATE:

MERGE INTO T_FINAL AS TGT
  USING (
    WITH CTE_X AS (
      SELECT ID
            ,COL1
            ,COL2
            ,COL3
            ,MD5(ARRAY_TO_STRING(ARRAY_CONSTRUCT(ID, COL1, COL2, COL3), '^')) AS COL_MD5_HASH
        FROM T_STAGE
    )
    SELECT FALSE AS UPDATE_FLAG
          ,X.ID
          ,X.COL1
          ,X.COL2
          ,X.COL3
          ,X.COL_MD5_HASH
      FROM CTE_X X
     WHERE NOT EXISTS (
             SELECT 1
               FROM T_FINAL T2
              WHERE T2.COL_MD5_HASH = X.COL_MD5_HASH
           )
    UNION ALL
    SELECT TRUE AS UPDATE_FLAG
          ,X.ID
          ,X.COL1
          ,X.COL2
          ,X.COL3
          ,X.COL_MD5_HASH
      FROM CTE_X X
           JOIN T_FINAL T3
             ON T3.END_TS IS NULL
            AND T3.ID = X.ID
            AND T3.COL_MD5_HASH != X.COL_MD5_HASH
  ) AS SRC
  ON TGT.END_TS IS NULL
 AND SRC.ID = TGT.ID
 AND SRC.UPDATE_FLAG
 WHEN NOT MATCHED THEN INSERT (ID, START_TS, END_TS, COL1, COL2, COL3, COL_MD5_HASH)
   VALUES (SRC.ID, CURRENT_TIMESTAMP(), NULL, SRC.COL1, SRC.COL2, SRC.COL3, SRC.COL_MD5_HASH)
 WHEN MATCHED THEN UPDATE SET END_TS = CURRENT_TIMESTAMP()
;
Run Code Online (Sandbox Code Playgroud)

根据您的规格,有许多假设和变化。例如,如果每次 MERGE 之后清除阶段表行,则可以删除 NOT EXISTS...它只是为了避免多次插入同一阶段行。您必须进行调整以符合您的规格。这仅用于说明目的(既然您提出了要求)。