标签: snowflake-pipe

雪花管道状态 numOutstandingMessagesOnChannel

我有一个管道,看起来设置得很好,但就是不起作用。

我跑了

select system$pipe_status('"MY_DB"."MY_SCHEMA".MY_PIPE_NAME');
Run Code Online (Sandbox Code Playgroud)

我回来的数量越来越多numOutstandingMessagesOnChannel

有人可以解释一下这是什么意思吗?

是要处理的行吗?我预计这个数字会下降吗?有没有一个数字太高了?

有什么办法/某种方法可以追踪价格上涨的原因/时间吗?

文件只是说,

numOutstandingMessagesOnChannel
Number of messages in the queue that have been queued but not received yet.
Run Code Online (Sandbox Code Playgroud)

snowflake-cloud-data-platform snowflake-pipe

6
推荐指数
1
解决办法
1059
查看次数

如何在不丢失或复制任何记录的情况下移动或更改管道

这个管理管道的页面建议了一个将管道中的 copy into 语句更改为语句的过程。

  1. 暂停管道(使用 ALTER PIPE ... SET PIPE_EXECUTION_PAUSED=true)。
  2. 查询 SYSTEM$PIPE_STATUS 函数并验证管道执行状态是否为 PAUSED 且挂起的文件计数为 0。
  3. 重新创建管道以更改定义中的 COPY 语句。选择以下任一选项: 放下管道(使用 DROP PIPE)并创建它(使用 CREATE PIPE)。重新创建管道(使用 CREATE OR REPLACE PIPE 语法)。在内部,管道被删除和创建。
  4. 再次查询SYSTEM$PIPE_STATUS 函数并验证管道执行状态是否为RUNNING。

但是,如果应该在暂停和重新创建管道之间的时间内加载文件,则此处没有刷新该间隙的步骤。即使这些步骤很快发生,我们也有丢失文件的例子。

运行ALTER PIPE REFRESH虽然会导致重复,因为复制历史记录与管道相关联。重新创建的管道没有此历史记录,将返回并重新加载所有内容。

有没有一种很好的方法来编写这样的更改以确保没有间隙或重叠?类似于获取原始管道暂停时的时间戳,然后在刷新查询中使用该时间戳?

更新:我们构建了一个完整的流程和脚本组合来处理我们的场景。完整脚本包含在下面的答案中。

snowflake-cloud-data-platform snowflake-pipe

5
推荐指数
2
解决办法
424
查看次数

如何从 Mysql 数据库获取数据到 Snowflake

有什么聪明的方法可以将我的数据从 mysql 数据库导入到雪花中吗?到目前为止我发现了两种可能的方法:

选项 1:将 Snowpipe 放在 mysql 数据库之上,管道会自动转换数据。选项 2:我手动将表转换为 csv 并将其存储在本地,然后通过暂存将其加载到雪花中。

对我来说,首先将每个表转换为 csv 似乎很奇怪。我不能直接将 sql 转储文件推送到雪花吗?我还可以在雪花中安排一些重新加载任务,以便自动触发选项 1 或选项 2 吗?

最佳 NiBeC24

snowflake-cloud-data-platform snowflake-pipe

5
推荐指数
1
解决办法
4959
查看次数