如何在使用 Snowpipe 时更新插入数据?

Rey*_*ers 3 snowflake-cloud-data-platform

语境

我非常喜欢使用 Snowpipe,但在使用它时我无法应用我的 upsert 逻辑。

这就是我的 upsert 逻辑:

create temp table temp_table (like target); 

copy into temp_table from @snowflake_stage;

begin transaction;

delete from target using temp_table 
where target.pk = temp_table.pk;

insert into target 
select * from temp_table;

end transaction;

drop table temp_table;
Run Code Online (Sandbox Code Playgroud)

然而,对于 Snowpipe,我只允许定义单个复制命令,因此我无法执行一系列命令。

我尝试过做什么

我考虑过使用任务和流,但任务似乎不支持事务(每个任务不止一个查询)。我也考虑过使用MERGE,但我必须明确定义我想要的列INSERT

例如,我不能做类似的事情(插入而不定义要插入的内容):

merge into src using temp_table on src.pk = temp_table.pk
when not matched then insert;
Run Code Online (Sandbox Code Playgroud)

在仍然使用 Snowpipe 的同时,还有其他方法可以更新插入我的数据吗?

小智 7

参考下面的使用说明

https://docs.snowflake.com/en/sql-reference/sql/create-pipe.html

Snowpipes 的用例是低延迟、小文件大小、频繁将数据加载到 Snowflake 中。它的源是外部或内部阶段支持的文件,其目标是雪花表。这个想法是数据在加载期间需要最少的转换,这是 COPY 语句支持的。换句话说,您无法在摄取时运行 MERGE INTO,这不是它的用例。一旦数据以雪花表的形式出现,您就可以按照您选择的频率在该数据之上运行任务。谷歌“Snowflake Data Pipelines”,它将突出显示 Stream 概念(Snowflake 的 CDC),该概念摄取有关登陆内容的新数据。流和任务的奇妙之处在于,您可以在单个表、共享或(最近在私人预览中添加)视图上拥有任意数量的视图。还要注意 Streams、UPSERTS v APPEND_ONLY 的模式。然后按照您想要的方式处理数据。

不过有几点需要考虑,

  • 您可以考虑使用外部表吗?根据来源的不同,性能可能会更好,因为 Snowflake 支持 parquet、csv、json、avro 和 orc
  • 如果使用外部表,那么您可以在其上放置一个 Stream,甚至是一个物化视图
  • 没有什么反对 Snowpipe 的,我们的文档对如何设置它有很好的解释步骤和指南,我们甚至还有使用自动通知服务(如 AWS SQS+SNS)部署 Snowpipe 或调用我们的 Snowpipe REST API 的教程。
  • 另请注意,MERGE into 通常用于 UPDATE 和 INSERT(并锁定目标表),并且 Snowflake 的微分区是不可变的。这意味着,尽管您看到更新,但底层发生的情况是现有微分区致力于时间旅行期间,而新的微分区处于“活动”微分区状态(不需要 TT)。根据表和您用于该表的时间旅行长度,您最终可能会得到比“活动”微分区更多的时间旅行和故障安全微分区。设计时需要考虑的一些事情,https://docs.snowflake.com/en/user-guide/tables-storage-considerations.html#managing-costs-for-large-high-churn-tables