从 Snowflake 查询 delta Lake 以读取最新版本

ebe*_*tbm 0 snowflake-cloud-data-platform delta-lake

我想使用该命令将 Databricks 上的 Delta 表中的数据加载到 Snowflake 上的表中MERGE INTO

目标是 Databricks 上的 Delta 表中的记录数量与 Snowflake 上的表中的记录数量看起来相同。

发生的问题是,由于 Delta Lake(S3 路径)有多个版本,Snowflake 会查询重复记录。

如何才能只读取最新版本的 Delta Lake?

MERGE INTO myTable as target USING (
    SELECT
    $1:DAY::TEXT AS DAY,
    $1:CHANNEL_CATEGORY::TEXT AS CHANNEL_CATEGORY,
    $1:SOURCE::TEXT AS SOURCE,
    $1:PLATFORM::TEXT AS PLATFROM,
    $1:LOB::TEXT AS LOB
    FROM @StageFilePathDeltaLake
    (FILE_FORMAT => 'sf_parquet_format')
  ) as src 
        ON target.CHANNEL_CATEGORY = src.CHANNEL_CATEGORY 
        AND target.SOURCE = src.SOURCE 
WHEN MATCHED THEN
UPDATE SET 
 DAY= src.DAY
,PLATFORM= src.PLATFORM
,LOB= src.LOB
WHEN NOT MATCHED THEN
INSERT  (
      DAY,
      CHANNEL_CATEGORY,
      SOURCE,
      PLATFORM,
      LOB
) VALUES  (
  src.DAY,
  src.CHANNEL_CATEGORY,
  src.SOURCE,
  src.PLATFORM,
  src.LOB
);
Run Code Online (Sandbox Code Playgroud)

sf_parque_format 是使用以下详细信息创建的:

create or replace file format sf_parquet_format
    type = 'parquet'
    compression = auto;
Run Code Online (Sandbox Code Playgroud)

Fel*_*ffa 5

上面的问题是 Snowflake 将其读取为 Parquet 文件,而不是 Delta 文件。

\n

解决方案很简单,明确其 Delta 性质:使用上述阶段创建外部表,然后table_format = delta从外部表查询,而不是直接从阶段查询。

\n
create external table \xe2\x80\xa6\n location=@mystage/daily/\n refresh_on_create =  false\n auto_refresh = false\n file_format = (type = parquet)\n table_format = delta; -- this one \n
Run Code Online (Sandbox Code Playgroud)\n

作为我的队友的补充说明:如果您想查看最新数据(如果 Delta Lake 已更新,则在创建外部表后),您将需要执行ALTER EXTERNAL TABLE {name} REFRESH. 您还可以在外部表顶部创建一个仅插入流,但只有在发生刷新后才会填充。

\n