Airflow - 如何将xcom变量传递给Python函数

luc*_*axi 23 airflow

我需要引用一个由a返回的变量BashOperator.我可能做错了所以请原谅我.在我task_archive_s3_file,我需要从中获取文件名get_s3_file.任务只是打印{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}为字符串而不是值.

如果我使用bash_command,则值正确打印.

get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
    python_callable=obj.func_archive_s3_file,
    params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file
Run Code Online (Sandbox Code Playgroud)

Dan*_*ang 30

类似的模板{{ ti.xcom_pull(...) }}只能在支持模板的参数内使用,或者在执行之前不会呈现.请参阅PythonOperatorBashOperatortemplate_fieldstemplate_ext属性.

所以,templates_dict是你用什么模板传递给你的Python运营商:

def func_archive_s3_file(**context):
    archive(context['templates_dict']['s3_path_filename'])

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,  # must pass this because templates_dict gets passed via context
    templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })
Run Code Online (Sandbox Code Playgroud)

但是,在获取XCom值的情况下,另一种方法是使用TaskInstance通过上下文提供给您的对象:

def func_archive_s3_file(**context):
    archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,
Run Code Online (Sandbox Code Playgroud)


Aar*_*ron 20

同时提出了问题和答案,但我认为对于那些只想PythonOperator在DAG中的任务之间传递小数据对象的用户来说,这可以更明确一些.引用这个问题和这个XCom例子让我得到了以下解决方案.超级简单:

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
  dag_id='example_dag',
  start_date=datetime.now(),
  schedule_interval='@once'
)

def push_function(**kwargs):
    ls = ['a', 'b', 'c']
    return ls

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='push_task')
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task
Run Code Online (Sandbox Code Playgroud)

我不确定为什么会这样,但确实如此.社区的一些问题:

  • ti这里发生了什么?内置的是**kwargs什么?
  • provide_context=True两个功能都是必要的吗?

任何使这个答案更清晰的编辑都是非常受欢迎的!

  • 这是有效的,因为任何返回值的任务都存储在xcom中[[source](https://github.com/apache/incubator-airflow/blob/1.8.2/airflow/models.py#L1392-L1394)).所以`PythonOperator`的任何返回值都保存到XCom(需要注意的事项!).提供上下文需要使用引用的`**kwargs`,我通常将其命名为`**context`.此上下文与jinja模板中的上下文相同([source](https://github.com/apache/incubator-airflow/blob/1.8.2/airflow/models.py#L1550-L1581)).Context提供了许多特定于DAG运行的有用信息. (4认同)
  • 都!要推入或拉出,您需要访问当前运行的“ TaskInstance”对象,该对象仅可通过“上下文”使用。 (2认同)

小智 6

使用相同的代码并修改了参数,例如Startdateetc。

import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

DAG = DAG(
  dag_id='simple_xcom',
  default_args=args,
#  start_date=datetime(2019, 04, 21),
  schedule_interval="@daily",
  #schedule_interval=timedelta(1)
)

def push_function(**context):
    msg='the_message'
    print("message to push: '%s'" % msg)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="the_message", value=msg)

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='push_task',key='the_message')
    print("received message: '%s'" % msg)

pull_task = PythonOperator(`enter code here`
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task
Run Code Online (Sandbox Code Playgroud)

  • 这些名称“ ti”和“ task_instance”从何而来? (4认同)
  • @LiuWeibo请参阅气流宏:https://airflow.apache.org/code.html#macros (3认同)
  • 气流宏链接已更新:https://airflow.apache.org/docs/stable/macros-ref.html (2认同)

Nou*_*non 5

在 Airflow 2.0(2020 年 12 月发布)中,TaskFlow API使通过 XCom 变得更加容易。使用此 API,您可以简单地从用 @task 注释的函数返回值,并且它们将在幕后作为 XCom 传递。教程中的示例:

    @task()
    def extract():
        ...
        return order_data_dict
    
    @task()
    def transform(order_data_dict: dict):
        ...
        return total_order_value

    order_data = extract()
    order_summary = transform(order_data)
Run Code Online (Sandbox Code Playgroud)

在此示例中,order_data类型为XComArg。它存储任务返回的字典extract。当transform任务运行时,order_data被解包,并且任务接收存储的普通 Python 对象。