dba*_*ten 5 python python-3.x airflow
我想将值列表或实际上任何值作为自定义运算符的参数传递,修改运算符中的值,然后通过{{ params }}宏在sql模板中访问这些值。
这是我设置中的相关部分,为清晰起见略作了一些设计。
DAG定义:
from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 2, 27),
'provide_context': True,
'depends_on_past': True
}
dag = DAG(
'etl',
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
template_searchpath=tmpl_search_path,
default_args=default_args,
max_active_runs=1)
process_product_dim = ProcessDimensionOperator(
task_id='process_product_dim',
mysql_conn_id='mysql_dwh',
sql='process_dimension.sql',
database='dwh',
col_names=[
'id',
'name',
'category',
'price',
'available',
'country',
],
t_name='products',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
运算符定义:
from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class ProcessDimensionOperator(BaseOperator):
template_fields = (
'sql',
'parameters')
template_ext = ('.sql',)
@apply_defaults
def __init__(
self,
sql,
t_name,
col_names,
database,
mysql_conn_id='mysql_default',
*args, **kwargs):
super(ProcessDimensionOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.t_name = t_name
self.col_names = col_names
self.database = database
self.mysql_conn_id = mysql_conn_id
self.parameters = parameters
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
self.params['col_names'] = self.col_names
self.params['t_name'] = self.t_name
self.params['match_statement'] = self.construct_match_statement(self.col_names)
hook.run(sql=self.sql)
def construct_match_statement(self, cols):
map_list = map(lambda x: f'and t.{x} = s.{x}', cols[1:])
return ' '.join(map_list)
Run Code Online (Sandbox Code Playgroud)
process_dimension.sql
create table if not exists staging.{{ params.t_name }};
select
*
from
source.{{ params.t_name }} as source
join
target.{{ params.t_name }} as target
on source.id = target.id {{ params.match_statement }}
Run Code Online (Sandbox Code Playgroud)
但这会引发错误,{{ params.t_name }}并且和{{ params.match_statement}}呈现为null。
t_name和c_name,并将paramsmap / join逻辑保留在sql模板中。这可行,但我想尽可能地将逻辑排除在模板之外params={xxx}到super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)hook.run()方法parameters={xxx}并对其进行模板化,%(x)s但这会导致问题,因为它在变量周围用引号引起来,从而使各种sql语句混乱我是python和airflow的新手,所以我很可能会丢失一些明显的东西,任何帮助将不胜感激,谢谢!
同样在这里。我刚刚花了几个小时(几天?)找出问题的原因(上帝保佑 IPython.embed 和日志记录)。从 Airflow 1.10.3 开始,这是由 TaskInstance.render_templates() 引起的,在渲染任何 template_fields 或 template_exts 后,它不会更新 Jinja 上下文,只会更新任务属性。在这里看到它!
因此你只需要使用
{{ task.params.whatever }}
代替
{{ params.whatever }}
在您的 .sql 模板文件中。
事实上,如果 Jinja 上下文要不断更新,那么就必须注意模板的顺序和依赖关系。这是一种嵌套/递归渲染。它还可能存在性能缺陷。
另外,我不建议使用“参数”(与“参数”不同),因为它们似乎旨在作为参数传递给数据库游标,然后您将无法传递数字/整数、列名或表名,或者只是一个 SQL 片段(例如,where、having、limit...)。
| 归档时间: |
|
| 查看次数: |
563 次 |
| 最近记录: |