最近六个月以来,我一直在使用Airlfow。我非常高兴在Airflow中定义工作流程。我在以下情况下无法获得xcom值(以黄色突出显示)。
请在下面的示例代码中找到代码:
def push_function(**context):
context['ti'].xcom_push(key='reportid', value='xyz')
dummy_operator = DummyOperator(
task_id='Start',
dag=main_dag
)
push_function_task = PythonOperator(
task_id='push_function',
provide_context=True,
python_callable=push_function,
op_kwargs={},
dag=main_dag)
push_function_task .set_upstream(dummy_operator)
custom_task = CustomOperator(
dag=main_dag,
task_id='import_data',
provide_context=True,
url="http://www.google.com/{}".format("{{task_instance.xcom_pull(task_ids='push_function')}}")
)
custom_task .set_upstream(push_function_task)
Run Code Online (Sandbox Code Playgroud)
注意:1. CustomOperator是我自己的操作员,负责下载给定URL的数据
请帮我。
谢谢,萨曼斯
我相信您在推拉XCom时按键不匹配。每个XCom值都与DAG ID,任务ID和密钥相关联。如果您用report_id钥匙推动,那么也需要用它来推动。
请注意,如果未将键指定为xcom_pull(),则使用默认值return_value。这是因为如果任务返回结果,Airflow会自动将其推送到键下的XCom return_value。
这为您提供了两种解决问题的方法:
1)继续按到report_id键,并确保也从中拉出
def push_function(**context):
context['ti'].xcom_push(key='reportid', value='xyz')
...
custom_task = CustomOperator(
...
url="http://www.google.com/{}".format("{{ task_instance.xcom_pull(task_ids='push_function', key='reportid') }}")
)
Run Code Online (Sandbox Code Playgroud)
2)push_function()返回要推送到XCom的值,然后从默认键中拉出。
def push_function(**context):
return 'xyz'
...
custom_task = CustomOperator(
...
url="http://www.google.com/{}".format("{{ task_instance.xcom_pull(task_ids='push_function') }}")
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1581 次 |
| 最近记录: |