我目前正在使用 dbt 并尝试构建 PSA(持久暂存区域),在我看来,快照功能非常适合此目的。但是,我在源中没有时间戳,所以我必须使用“检查”策略。
对于“check_cols”,我想使用哈希值,所以我想到了 dbt_utils.surrogate_key()。
但我想计算除始终相同的两列之外的所有列的哈希值。
所以我的模型看起来像这样:
{% snapshot Item_hist %}
{{
config(
unique_key='item_id',
strategy='check',
check_cols=['diff_hash'],
target_database='PSA',
target_schema='sourceA',
alias= 'Item',
invalidate_hard_deletes=True
)
}}
select {{ dbt_utils.surrogate_key(['Tenant','ItemNo']) }} as item_id,
{{ dbt_utils.surrogate_key( dbt_utils.star(from=source('sourceA', 'item'), except=["fieldA", "fieldB"]) ) }} as diff_hash,
*
from {{ source('sourceA', 'item') }}
{% endsnapshot %}
Run Code Online (Sandbox Code Playgroud)
不幸的是,dbt_utils.surrogate_key() 无法处理 dbt_utils.star() 的返回值。我该如何继续,以便 surrogate_key() 可以根据返回值计算哈希值?
检查源代码dbt_utils.star,返回以下格式的列
`col1` as `col1`,
`col2` as `col2`,
...
Run Code Online (Sandbox Code Playgroud)
引号取决于适配器(示例中的 BigQuery),并且as包含别名 using,因为该方法可以获取suffix和prefix参数
dbt_utils.surrogate_key需要一个值列表,而不是返回的字符串dbt_utils.star
我在dbt Slack 组中使用get_columns_in_relation找到了解决方案
感谢 Lee Werner 发布了解决方案:
{%- set columns = get_columns_in_relation(ref('my_model')) -%}
{% set column_names = columns|map(attribute='name')|list %}
SELECT
{{ dbt_utils.surrogate_key( column_names )}} as HashID,
*
FROM {{ref('my_model')}}
Run Code Online (Sandbox Code Playgroud)
在您的情况下,您想要忽略某些列,因此我们可以对列表应用拒绝过滤器
{% snapshot Item_hist %}
{{
config(
unique_key='item_id',
strategy='check',
check_cols=['diff_hash'],
target_database='PSA',
target_schema='sourceA',
alias= 'Item',
invalidate_hard_deletes=True
)
}}
{% set columns = get_columns_in_relation(source('sourceA', 'item')) -%}
{% set column_names = columns|map(attribute='name')|reject("in", ['fieldA','fieldB'])|list %}
select {{ dbt_utils.surrogate_key(['Tenant','ItemNo']) }} as item_id,
{{ dbt_utils.surrogate_key(column_names) }} as diff_hash,
*
from {{ source('sourceA', 'item') }}
{% endsnapshot %}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5670 次 |
| 最近记录: |