ali*_*990 1 sql merge snowflake-cloud-data-platform data-vault
我正在将数据加载到雪花数据仓库建模数据库中。当行的字段被更新时,模型的工作方式如下:
current_timestamp()。我merge在 JavaScript 过程中使用 Snowflake 中的命令来执行此操作:
var observarion_query = "MERGE INTO HUB_OBSERVATION AS OBS "+
"USING (SELECT DATE(T.$"+OBSERVATION_DATE+", 'DD/MM/YYYY') AS OBS_DATE, T.$"+LOCATIONS+", T.$"+SUBMISSION_TIME+" FROM "+FILE_FULL_PATH+"(FILE_FORMAT=>"+FILE_FORMAT_NAME+") T) ST "+
"ON md5(CONCAT(ST.OBS_DATE, CONCAT('CAMP', CONCAT(ST.$"+LOCATION_POSITION+", ST.$"+SUBMISSION_TIME+")))) = OBS.OBSERVATION_DATE_LOCATION_HASH_KEY "+
"WHEN MATCHED THEN UPDATE SET OBS.LOAD_END_DT = CURRENT_TIMESTAMP() "+
"WHEN NOT MATCHED THEN "+
"INSERT (OBSERVATION_DATE_LOCATION_HASH_KEY, LOAD_DT, LOAD_END_DT, RECORD_SRC, OBSERVATION_DATE, LOCATION_NAME) "+
"VALUES (md5(CONCAT(ST.OBS_DATE, CONCAT('CAMP', CONCAT(ST.$"+LOCATION_POSITION+", ST.$"+SUBMISSION_TIME+")))), current_timestamp(), NULL, 'ONA', ST.OBS_DATE, CONCAT('CAMP', ST.$"+LOCATION_POSITION+")) ";
Run Code Online (Sandbox Code Playgroud)
问题在于WHEN MATCHED THEN,我需要使用新值对同一行进行插入,但有额外的条件:
WHEN MATCHED and OBS.REVIEW_STATUS <> ST.REVIEW_STATUS THEN
// INSERT THE ROW
Run Code Online (Sandbox Code Playgroud)
我确实知道我们无法在WHEN MATCHED THEN语句内执行插入查询。
我们怎样才能找到转机来做到这一点?
如果我正确理解您的问题,您希望单个源行可能导致针对目标表的 2 个操作:
您可以通过使用 UNION ALL 将 USING 子句内的每个源行“拆分”为 2 行来实现此目的:一行用于 UPDATE,一行用于 INSERT。我通常包含一个布尔标志来区分它们(因为它们是重复的)。在 INSERT 部分中,我执行SELECT ... WHERE NOT EXISTS (SELECT 1 FROM target WHERE key = key and MD5() = MD5())以便仅在新行与之前的行不同时才插入新行当前行。我的 ON 子句对代表 UPDATE 场景的布尔值有一个过滤器。
[编辑以包含示例合并]
首先,假设以下阶段和决赛表定义:
CREATE OR REPLACE TRANSIENT TABLE T_STAGE (
ID INTEGER
,COL1 VARCHAR
,COL2 VARCHAR
,COL3 VARCHAR
)
;
CREATE OR REPLACE TRANSIENT TABLE T_FINAL (
ID INTEGER
,START_TS TIMESTAMP_LTZ
,END_TS TIMESTAMP_LTZ
,COL1 VARCHAR
,COL2 VARCHAR
,COL3 VARCHAR
,COL_MD5_HASH VARCHAR
)
;
Run Code Online (Sandbox Code Playgroud)
此 MERGE 说明了如何使用 UNION ALL 将单个源行拆分为 2 个,以便可以对目标表应用 INSERT 和 UPDATE:
MERGE INTO T_FINAL AS TGT
USING (
WITH CTE_X AS (
SELECT ID
,COL1
,COL2
,COL3
,MD5(ARRAY_TO_STRING(ARRAY_CONSTRUCT(ID, COL1, COL2, COL3), '^')) AS COL_MD5_HASH
FROM T_STAGE
)
SELECT FALSE AS UPDATE_FLAG
,X.ID
,X.COL1
,X.COL2
,X.COL3
,X.COL_MD5_HASH
FROM CTE_X X
WHERE NOT EXISTS (
SELECT 1
FROM T_FINAL T2
WHERE T2.COL_MD5_HASH = X.COL_MD5_HASH
)
UNION ALL
SELECT TRUE AS UPDATE_FLAG
,X.ID
,X.COL1
,X.COL2
,X.COL3
,X.COL_MD5_HASH
FROM CTE_X X
JOIN T_FINAL T3
ON T3.END_TS IS NULL
AND T3.ID = X.ID
AND T3.COL_MD5_HASH != X.COL_MD5_HASH
) AS SRC
ON TGT.END_TS IS NULL
AND SRC.ID = TGT.ID
AND SRC.UPDATE_FLAG
WHEN NOT MATCHED THEN INSERT (ID, START_TS, END_TS, COL1, COL2, COL3, COL_MD5_HASH)
VALUES (SRC.ID, CURRENT_TIMESTAMP(), NULL, SRC.COL1, SRC.COL2, SRC.COL3, SRC.COL_MD5_HASH)
WHEN MATCHED THEN UPDATE SET END_TS = CURRENT_TIMESTAMP()
;
Run Code Online (Sandbox Code Playgroud)
根据您的规格,有许多假设和变化。例如,如果每次 MERGE 之后清除阶段表行,则可以删除 NOT EXISTS...它只是为了避免多次插入同一阶段行。您必须进行调整以符合您的规格。这仅用于说明目的(既然您提出了要求)。