小编cmc*_*lel的帖子

参数中的 Airflow Jinja 模板

我有一个 Airflow 运算符,它允许我查询 Athena,它接受 Jinja 模板文件作为查询输入。通常,我将表/数据库名称等变量传递给模板以创建表并添加分区语句。这对于定义的字符串效果很好。

我的任务定义如下:

        db = 'sample_db'
        table = 'sample_table'
        out = 's3://sample'
        p1='2020'
        p2='1'

        add_partition_task= AWSAthenaOperator(
            task_id='add_partition',
            query='add_partition.sql',
            params={'database': db,
                    'table_name': table,
                    'p1': p1
                    'p2': p2},
            database=db,
            output_location=out
        )
Run Code Online (Sandbox Code Playgroud)

模板化的 SQL 文件如下所示:

ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (partition1= '{{ params.p1 }}', partition2 = '{{ params.p2 }}')
Run Code Online (Sandbox Code Playgroud)

这个模板效果很好。

对此的扩展是允许“partition1”和“partition2”由 jinja 模板变量定义,该变量包含从早期任务中提取的 XCOM,该任务将日期转换为财务年度和期间。使用日期作为分区是可能的,但我感兴趣的是是否可以强制 params 接受 Jinja 模板。

我想使用的代码如下所示:

        db = 'sample_db'
        table = 'sample_table'
        out = 's3://sample'
        p1 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", …
Run Code Online (Sandbox Code Playgroud)

python jinja2 airflow amazon-athena

5
推荐指数
1
解决办法
7714
查看次数

标签 统计

airflow ×1

amazon-athena ×1

jinja2 ×1

python ×1