Airflow Python运算符中的宏

n1r*_*r44 15 jinja2 airflow

我可以在PythonOperator中使用宏吗?我试过跟随,但我无法获得渲染的宏!

dag = DAG(
    'temp',
    default_args=default_args,
    description='temp dag',
    schedule_interval=timedelta(days=1))

def temp_def(a, b, **kwargs):
    print '{{ds}}'
    print '{{execution_date}}'
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs))

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = PythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

jhn*_*lvr 22

仅对模板化字段处理宏.要让Jinja处理此字段,请PythonOperator使用您自己的字段进行扩展.

class MyPythonOperator(PythonOperator):
    template_fields = ('templates_dict','op_args')
Run Code Online (Sandbox Code Playgroud)

我添加'templates_dict'到了template_fields因为它PythonOperator本身有这个模板化的字段: PythonOperator

现在,您应该能够在该字段中使用宏:

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

  • 看来这个问题现在已经修复了,并且“op_kwargs”*已*模板化。 (4认同)
  • 我们可以将此标记为正确答案吗?因为这是正确的答案 (2认同)

cr0*_*IAN 11

在我看来,更接近本机的Airflow方法是使用包含的PythonOperator并使用provide_context=True参数.

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    provide_context=True,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

现在,您可以访问kwargscallable 中的所有宏,气流元数据和任务参数

def temp_def(**kwargs):
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))
Run Code Online (Sandbox Code Playgroud)

如果您有一些params与该任务相关联的自定义,您也可以访问这些kwargs['params']

  • 这可能是更好的方法.我的答案主要针对的是为什么宏没有被处理的具体问题. (4认同)