ebe*_*tbm 0 snowflake-cloud-data-platform delta-lake
我想使用该命令将 Databricks 上的 Delta 表中的数据加载到 Snowflake 上的表中MERGE INTO。
目标是 Databricks 上的 Delta 表中的记录数量与 Snowflake 上的表中的记录数量看起来相同。
发生的问题是,由于 Delta Lake(S3 路径)有多个版本,Snowflake 会查询重复记录。
如何才能只读取最新版本的 Delta Lake?
MERGE INTO myTable as target USING (
SELECT
$1:DAY::TEXT AS DAY,
$1:CHANNEL_CATEGORY::TEXT AS CHANNEL_CATEGORY,
$1:SOURCE::TEXT AS SOURCE,
$1:PLATFORM::TEXT AS PLATFROM,
$1:LOB::TEXT AS LOB
FROM @StageFilePathDeltaLake
(FILE_FORMAT => 'sf_parquet_format')
) as src
ON target.CHANNEL_CATEGORY = src.CHANNEL_CATEGORY
AND target.SOURCE = src.SOURCE
WHEN MATCHED THEN
UPDATE SET
DAY= src.DAY
,PLATFORM= src.PLATFORM
,LOB= src.LOB
WHEN NOT MATCHED THEN
INSERT (
DAY,
CHANNEL_CATEGORY,
SOURCE,
PLATFORM,
LOB
) VALUES (
src.DAY,
src.CHANNEL_CATEGORY,
src.SOURCE,
src.PLATFORM,
src.LOB
);
Run Code Online (Sandbox Code Playgroud)
sf_parque_format 是使用以下详细信息创建的:
create or replace file format sf_parquet_format
type = 'parquet'
compression = auto;
Run Code Online (Sandbox Code Playgroud)
上面的问题是 Snowflake 将其读取为 Parquet 文件,而不是 Delta 文件。
\n解决方案很简单,明确其 Delta 性质:使用上述阶段创建外部表,然后table_format = delta从外部表查询,而不是直接从阶段查询。
create external table \xe2\x80\xa6\n location=@mystage/daily/\n refresh_on_create = false\n auto_refresh = false\n file_format = (type = parquet)\n table_format = delta; -- this one \nRun Code Online (Sandbox Code Playgroud)\n作为我的队友的补充说明:如果您想查看最新数据(如果 Delta Lake 已更新,则在创建外部表后),您将需要执行ALTER EXTERNAL TABLE {name} REFRESH. 您还可以在外部表顶部创建一个仅插入流,但只有在发生刷新后才会填充。
| 归档时间: |
|
| 查看次数: |
3688 次 |
| 最近记录: |