我对气流和使用“xcom_push”和“xcom_pull”功能非常陌生。
我有两个 dags d1 ,其中包含任务 t1 ,第二个 dags d2 包含任务 t2 。
现在我使用以下命令推送 dag d1 中的值:
kwargs['ti'].xcom_push(key='start_date',value=start_date)
kwargs['ti'].xcom_push(key='end_date',value=end_date)
Run Code Online (Sandbox Code Playgroud)
并使用以下命令在 dag d2 中提取相同的 start_date 和 end_date :
start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date")
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date")
Run Code Online (Sandbox Code Playgroud)
但是在 xcom_pull 期间出现“NONETYPE”错误。任何人都可以帮助我如何将 dag d1 中的值提取到 dag d2 中
Ale*_*nov 10
您需要另外传递 param include_prior_dates=True,以便它可以检查以前日期的 XCom 。
:param include_prior_dates: 如果为 False,则仅返回当前execution_date 中的 XCom。如果为 True,则还会返回之前日期的 XCom。
在你的情况下看起来像这样:
start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date", include_prior_dates=True)
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date", include_prior_dates=True)
Run Code Online (Sandbox Code Playgroud)
你有实际设置 xcom 变量的东西吗?
尝试下面的方法:
d1
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
args = {
'owner': 'znovak',
'email': ['me@me.com'],
'depends_on_past': False,
'email_on_retry': False,
'start_date': datetime(2019, 11, 4)
}
dag = DAG(
dag_id='d1',
default_args=args,
catchup=False,
schedule_interval=None
)
###############################
##### Create DAG Parameters ###
###############################
def set_xcom_params(**kwargs):
kwargs['ti'].xcom_push(key='start_date',value=start_date)
kwargs['ti'].xcom_push(key='end_date',value=end_date)
t1 = PythonOperator(
task_id='t1',
python_callable=set_xcom_params,
dag=dag,
provide_context=True
)
Run Code Online (Sandbox Code Playgroud)
d2
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
args = {
'owner': 'znovak',
'email': ['me@me.com'],
'depends_on_past': False,
'email_on_retry': False,
'start_date': datetime(2019, 11, 4)
}
dag = DAG(
dag_id='d2',
default_args=args,
catchup=False,
schedule_interval=None
)
def pull_xcom_params(**kwargs):
start_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="start_date")
end_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="end_date")
print(start_date)
print(end_date)
t2 = PythonOperator(
task_id='t2',
python_callable=pull_xcom_params,
dag=dag,
provide_context=True
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
14684 次 |
| 最近记录: |