如何检查任务1失败的任务失败然后运行任务2,就像if else条件一样.
我想运行依赖任务.
Task1失败然后如何在条件如if1 ==失败然后运行task2和task3的情况下获取该错误日志.我试过,SSHHOOK但我正在寻找一个简单的解决方案.
with DAG(
'airflow',
catchup=False,
default_args={
'owner': 'abc',
'start_date': datetime(2018, 4, 17),
'schedule_interval':None,
'depends_on_past': False,
},
) as dag:
task_1 = PythonOperator(
task_id='task_1',
python_callable=do(),
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=do(),
)
task_3 = PythonOperator(
task_id='task_3',
python_callable=do()
task_3.set_upstream(task_2)
task_2.set_upstream(task_1)
Run Code Online (Sandbox Code Playgroud)
由于没有代码示例,我必须假设您的DAG可能是什么样子以及您想要做什么.另外,我不明白你为什么要使用SSHHook,但是没有代码示例.所以我们走了:
创建错误任务
def t2_error_task(context):
instance = context['task_instance']
do_stuff()
Run Code Online (Sandbox Code Playgroud)
创建任务
t1_task = PythonOperator(
task_id='my_operator_t1',
python_callable=do_python_stuff,
on_failure_callback=t2_error_task,
dag=dag
)
t3_task_success = PythonOperator(
task_id='my_operator_t3',
python_callable=do_python_stuff_success,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
然后在t1的上游设置t3:
t1_task >> t3_task_success
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2261 次 |
| 最近记录: |