在Apache Airflow中的自定义运算符中访问params参数

dba*_*ten 5 python python-3.x airflow

问题

我想将值列表或实际上任何值作为自定义运算符的参数传递,修改运算符中的值,然后通过{{ params }}宏在sql模板中访问这些值。

当前设置

这是我设置中的相关部分,为清晰起见略作了一些设计。

DAG定义:

from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2019, 2, 27),
    'provide_context': True,
    'depends_on_past': True
}

dag = DAG(
    'etl',
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=60),
    template_searchpath=tmpl_search_path,
    default_args=default_args,
    max_active_runs=1)

process_product_dim = ProcessDimensionOperator(
    task_id='process_product_dim',
    mysql_conn_id='mysql_dwh',
    sql='process_dimension.sql',
    database='dwh',
    col_names=[
        'id',
        'name',
        'category',
        'price',
        'available',
        'country',
    ],
    t_name='products',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

运算符定义:

from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class ProcessDimensionOperator(BaseOperator):
    template_fields = (
        'sql',
        'parameters')
    template_ext = ('.sql',)

    @apply_defaults
    def __init__(
            self,
            sql,
            t_name,
            col_names,
            database,
            mysql_conn_id='mysql_default',
            *args, **kwargs):
        super(ProcessDimensionOperator, self).__init__(*args, **kwargs)
        self.sql = sql
        self.t_name = t_name
        self.col_names = col_names
        self.database = database
        self.mysql_conn_id = mysql_conn_id
        self.parameters = parameters

    def execute(self, context):
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)

        self.params['col_names'] = self.col_names
        self.params['t_name'] = self.t_name
        self.params['match_statement'] = self.construct_match_statement(self.col_names)

        hook.run(sql=self.sql)

    def construct_match_statement(self, cols):
        map_list = map(lambda x: f'and t.{x} = s.{x}', cols[1:])

        return ' '.join(map_list)
Run Code Online (Sandbox Code Playgroud)

process_dimension.sql

create table if not exists staging.{{ params.t_name }};

select
    *
from
    source.{{ params.t_name }} as source
join
    target.{{ params.t_name }} as target
    on source.id = target.id {{ params.match_statement }}
Run Code Online (Sandbox Code Playgroud)

但这会引发错误,{{ params.t_name }}并且和{{ params.match_statement}}呈现为null。

我尝试过的

  • 在任务定义的参数中设置t_namec_name,并将paramsmap / join逻辑保留在sql模板中。这可行,但我想尽可能地将逻辑排除在模板之外
  • 传递params={xxx}super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)
  • 将para传入hook.run()方法parameters={xxx}并对其进行模板化,%(x)s但这会导致问题,因为它在变量周围用引号引起来,从而使各种sql语句混乱

我是python和airflow的新手,所以我很可能会丢失一些明显的东西,任何帮助将不胜感激,谢谢!

And*_*dor 5

同样在这里。我刚刚花了几个小时(几天?)找出问题的原因(上帝保佑 IPython.embed 和日志记录)。从 Airflow 1.10.3 开始,这是由 TaskInstance.render_templates() 引起的,在渲染任何 template_fields 或 template_exts 后,它不会更新 Jinja 上下文,只会更新任务属性。在这里看到它!

因此你只需要使用

{{ task.params.whatever }}

代替

{{ params.whatever }}

在您的 .sql 模板文件中。

事实上,如果 Jinja 上下文要不断更新,那么就必须注意模板的顺序和依赖关系。这是一种嵌套/递归渲染。它还可能存在性能缺陷。

另外,我不建议使用“参数”(与“参数”不同),因为它们似乎旨在作为参数传递给数据库游标,然后您将无法传递数字/整数、列名或表名,或者只是一个 SQL 片段(例如,where、having、limit...)。