Python Airflow - 返回PythonOperator的结果

Tej*_*eja 8 python airflow

我编写了一个包含多个PythonOperator的DAG

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
                          provide_context=True,
                          python_callable=Task1, dag=dag1)

def Task1(**kwargs):
    return(kwargs['dag_run'].conf.get('file'))
Run Code Online (Sandbox Code Playgroud)

从PythonOperator我调用"Task1"方法.该方法返回一个值,该值需要传递给下一个PythonOperator.如何从"task1"变量获取值或如何获取从Task1方法返回的值?

更新 :

    def Task1(**kwargs):
          file_name = kwargs['dag_run'].conf.get[file]
          task_instance = kwargs['task_instance']
          task_instance.xcom_push(key='file', value=file_name) 
          return file_name

  t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)

  t2 =   BashOperator(
      task_id='Moving_bucket', 
      bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
      dag=dag,
    )

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

tob*_*bi6 15

您可能需要查看Airflow的XCOM:https://airflow.apache.org/concepts.html#xcoms

如果从函数返回值,则此值存储在xcom中.在您的情况下,您可以从其他Python代码中访问它:

task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')
Run Code Online (Sandbox Code Playgroud)

或者像这样的模板:

{{ task_instance.xcom_pull(task_ids='Task1') }}
Run Code Online (Sandbox Code Playgroud)

如果你想指定一个键,你可以进入XCOM(在任务中):

task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)
Run Code Online (Sandbox Code Playgroud)

然后你可以像这样访问它:

task_instance.xcom_pull(task_ids='my_task', key='the_key')
Run Code Online (Sandbox Code Playgroud)

编辑1

后续问题:如何将值传递给另一个PythonOperator,而不是使用另一个函数中的值 - "t2 ="BashOperator(task_id ='Moving_bucket',bash_command ='python /home/raw.py"%s" '%file_name,dag = dag)"---我想访问"Task1"返回的file_name.如何实现?

首先,在我看来,这个价值实际上没有传递给另一个,PythonOperator而是传递给了另一个BashOperator.

其次,我的答案已经涵盖了这一点.该字段bash_command是模板化的(参见template_fields源代码:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py).因此,我们可以使用模板化版本:

BashOperator(
  task_id='Moving_bucket', 
  bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
  dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

编辑2

说明:Airflow的工作方式如下:它将执行Task1,然后填充xcom,然后执行下一个任务.因此,要使您的示例工作,您需要首先执行Task1,然后在Task1下游执行Moving_bucket.

由于您使用的是返回功能,因此您也可以省略key='file'from xcom_pull而不是在函数中手动设置它.