小编Dav*_*son的帖子

在 DBT 中使用 Unique_Key 中的多个列进行增量加载

对于增量模型,DBT 文档在这里说:

\n
\n

unique_key 应在模型定义中作为表示简单列的字符串或可以一起使用的单引号列名称列表提供,例如 [\'col1\', \'col2\', \xe2\x80 \xa6])

\n
\n

我已经用这个增量定义在 DBT 中构建了一个增量模型

\n
{{\n  config(\n    materialized=\'incremental\',\n    unique_key = [\'Col1\', \'Col2\', \'Col3\']\n  )\n}}\n
Run Code Online (Sandbox Code Playgroud)\n

它编译成 Snowflake 中的合并语句:

\n
{{\n  config(\n    materialized=\'incremental\',\n    unique_key = [\'Col1\', \'Col2\', \'Col3\']\n  )\n}}\n
Run Code Online (Sandbox Code Playgroud)\n

这合理地引发了一个 SQL 错误,抱怨括号:

\n
\n

SQL编译错误:语法错误第4行位于位置32意外\'[\'。第 4 行语法错误位于位置 45 意外的 \',\'。语法错误第 4 行位于位置 98 处意外的 \'[\'。第 4 行语法错误,位置 111 处出现意外的 \',\'。

\n
\n

我找不到任何其他以这种方式使用多列的好例子。(有涉及连接列的选项,我愿意接受有关最佳方法的建议,但我正在尝试找出如何使用 DBT 推荐的语法)

\n

snowflake-cloud-data-platform dbt

10
推荐指数
1
解决办法
8213
查看次数

如何在不丢失或复制任何记录的情况下移动或更改管道

这个管理管道的页面建议了一个将管道中的 copy into 语句更改为语句的过程。

  1. 暂停管道(使用 ALTER PIPE ... SET PIPE_EXECUTION_PAUSED=true)。
  2. 查询 SYSTEM$PIPE_STATUS 函数并验证管道执行状态是否为 PAUSED 且挂起的文件计数为 0。
  3. 重新创建管道以更改定义中的 COPY 语句。选择以下任一选项: 放下管道(使用 DROP PIPE)并创建它(使用 CREATE PIPE)。重新创建管道(使用 CREATE OR REPLACE PIPE 语法)。在内部,管道被删除和创建。
  4. 再次查询SYSTEM$PIPE_STATUS 函数并验证管道执行状态是否为RUNNING。

但是,如果应该在暂停和重新创建管道之间的时间内加载文件,则此处没有刷新该间隙的步骤。即使这些步骤很快发生,我们也有丢失文件的例子。

运行ALTER PIPE REFRESH虽然会导致重复,因为复制历史记录与管道相关联。重新创建的管道没有此历史记录,将返回并重新加载所有内容。

有没有一种很好的方法来编写这样的更改以确保没有间隙或重叠?类似于获取原始管道暂停时的时间戳,然后在刷新查询中使用该时间戳?

更新:我们构建了一个完整的流程和脚本组合来处理我们的场景。完整脚本包含在下面的答案中。

snowflake-cloud-data-platform snowflake-pipe

5
推荐指数
2
解决办法
424
查看次数