使用 xcom pull 检索从其他 dag 推送的变量

AGa*_*aur 8 python airflow

我对气流和使用“xcom_push”和“xcom_pull”功能非常陌生。

我有两个 dags d1 ,其中包含任务 t1 ,第二个 dags d2 包含任务 t2 。

现在我使用以下命令推送 dag d1 中的值:

kwargs['ti'].xcom_push(key='start_date',value=start_date)
kwargs['ti'].xcom_push(key='end_date',value=end_date)
Run Code Online (Sandbox Code Playgroud)

并使用以下命令在 dag d2 中提取相同的 start_date 和 end_date :

start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date")
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date")
Run Code Online (Sandbox Code Playgroud)

但是在 xcom_pull 期间出现“NONETYPE”错误。任何人都可以帮助我如何将 dag d1 中的值提取到 dag d2 中

Ale*_*nov 10

您需要另外传递 param include_prior_dates=True,以便它可以检查以前日期的 XCom 。

:param include_prior_dates: 如果为 False,则仅返回当前execution_date 中的 XCom。如果为 True,则还会返回之前日期的 XCom。

在你的情况下看起来像这样:

start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date", include_prior_dates=True)
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date", include_prior_dates=True)
Run Code Online (Sandbox Code Playgroud)


Zac*_*ack 0

你有实际设置 xcom 变量的东西吗?

尝试下面的方法:

d1

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

args = {
    'owner': 'znovak',
    'email': ['me@me.com'],
    'depends_on_past': False,
    'email_on_retry': False,
    'start_date': datetime(2019, 11, 4)
}

dag = DAG(
    dag_id='d1',
    default_args=args,
    catchup=False,
    schedule_interval=None
    )

###############################
##### Create DAG Parameters ###
###############################
def set_xcom_params(**kwargs):
    kwargs['ti'].xcom_push(key='start_date',value=start_date)
    kwargs['ti'].xcom_push(key='end_date',value=end_date)

t1 = PythonOperator(
    task_id='t1',
    python_callable=set_xcom_params,
    dag=dag,
    provide_context=True
    )
Run Code Online (Sandbox Code Playgroud)

d2

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

args = {
    'owner': 'znovak',
    'email': ['me@me.com'],
    'depends_on_past': False,
    'email_on_retry': False,
    'start_date': datetime(2019, 11, 4)
}

dag = DAG(
    dag_id='d2',
    default_args=args,
    catchup=False,
    schedule_interval=None
    )

def pull_xcom_params(**kwargs):
    start_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="start_date")
    end_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="end_date")
    print(start_date)
    print(end_date)

t2 = PythonOperator(
    task_id='t2',
    python_callable=pull_xcom_params,
    dag=dag,
    provide_context=True
    )
Run Code Online (Sandbox Code Playgroud)