我想将气流变量传递到 SQL 查询模板文件,如下所示(在 sql/test.sql 文件中):
select 'test', '{{ params.test_ds }}', '{{ test_dt }}' from test_table;
Run Code Online (Sandbox Code Playgroud)
我创建了一个继承自 PostgresOperator 的 Operator:
class EtlOperator(PostgresOperator):
template_fields = ('sql', 'test_dt', 'params')
template_ext = PostgresOperator.template_ext
@apply_defaults
def __init__(self, test_dt, params, *args, **kwargs):
super(EtlRunIdOperator, self).__init__(*args, **kwargs)
self.test_dt = test_dt
self.params = params
def execute(self, context):
super(EtlRunIdOperator, self).execute(context)
Run Code Online (Sandbox Code Playgroud)
我创建了这个任务:
test_task00 = EtlOperator(
task_id=f'test_task00',
postgres_conn_id='redshift',
sql='sql/test.sql',
params={
'test_ds': '{{ ds }}'
},
database='default',
test_dt='{{ execution_date }}',
provide_context=True, # tried without it too
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
然而,无论 params 或 test_dt 是 template_fields 的一部分,SQL 仍然不解析变量,结果如下:
INFO - Executing: select 'test', '{{ ds }}', '' from test_table;
Run Code Online (Sandbox Code Playgroud)
我的配置有什么问题吗?
| 归档时间: |
|
| 查看次数: |
1595 次 |
| 最近记录: |