Rei*_*ams 2 sql database list jinja2 dbt
首先,我是 dbt 的支持者!我喜欢这个工具和它的多功能性。
在阅读一些文档时,我注意到每次调用宏时我都可以对我的模式进行一些元工作。
其中之一是清理模式。
(这已根据 dbt slack 中的讨论进行了编辑)
dbt run-operation freeze这将内省所有将使用 dbt run 但使用自动生成的哈希(可能只是时间戳)编写的表。它会以我选择的模式输出这些表,并将“哈希”记录到控制台。
dbt run-operation unfreeze --args '{hash: my_hash}' 然后将继续查找使用该哈希前缀编写的表并将它们从架构中清除。
我在旧版本的 dbt 中创建了这样一个宏,它仍然适用于 0.17.1。
下面的宏item_in_list_query是tables从一个单独的宏get_tables(也在下面)中获取的列表。然后在内部连接该表列表item_in_list_query以组成所需的 SQL 查询并执行它。为了演示,还使用了一个模型item_in_list_query。
{% macro item_in_list_query() %}
{% set tables = get_tables() %}
{{ log("Tables: " ~ tables, True) }}
{% set query %}
select id
from my_tables
{% if tables -%}
where lower(table_name) in {% for t in tables -%} {{ t }} {%- endfor -%}
{%- endif -%}
{% endset %}
{{ log("query: " ~ query, True) }}
{# run_query returns agate.Table (https://agate.readthedocs.io/en/1.6.1/api/table.html). #}
{% set results = run_query(query) %}
{{ log("results: " ~ results, True) }}
{# execute is a Jinja variable that returns True when dbt is in "execute" mode i.e. True when running dbt run but False during dbt compile. #}
{% if execute %}
{# agate.table.rows is agate.MappedSequence in which data that can be accessed either by numeric index or by key. #}
{% set results_list = results.rows %}
{% else %}
{% set results_list = [] %}
{% endif %}
{{ log("results_list: " ~ results_list, True) }}
{{ return(results_list) }}
{% endmacro %}
Run Code Online (Sandbox Code Playgroud)
{% macro get_tables() %}
{%- set tables = [
('table1', 'table2')
] -%}
{{return(tables )}}
{% endmacro %}
Run Code Online (Sandbox Code Playgroud)
{%- for item in item_in_list_query() -%}
{%- if not loop.first %} UNION ALL {% endif %}
select {{ item.id }}
{%- endfor -%}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3353 次 |
| 最近记录: |