如何检查任务1是否失败然后在气流中运行任务2?

gri*_*007 4 airflow

如何检查任务1失败的任务失败然后运行任务2,就像if else条件一样.

我想运行依赖任务.

Task1失败然后如何在条件如if1 ==失败然后运行task2和task3的情况下获取该错误日志.我试过,SSHHOOK但我正在寻找一个简单的解决方案.

  with DAG(
    'airflow',
    catchup=False,
    default_args={
        'owner': 'abc',
    'start_date': datetime(2018, 4, 17),
        'schedule_interval':None,
        'depends_on_past': False,
    },   
) as dag:
    task_1 = PythonOperator(
        task_id='task_1', 
        python_callable=do(),
    )
    task_2 = PythonOperator(
        task_id='task_2',
        python_callable=do(),
    )
    task_3 = PythonOperator(
        task_id='task_3',
        python_callable=do()

    task_3.set_upstream(task_2)
    task_2.set_upstream(task_1)
Run Code Online (Sandbox Code Playgroud)

tob*_*bi6 6

由于没有代码示例,我必须假设您的DAG可能是什么样子以及您想要做什么.另外,我不明白你为什么要使用SSHHook,但是没有代码示例.所以我们走了:

创建错误任务

def t2_error_task(context):

    instance = context['task_instance']
    do_stuff()
Run Code Online (Sandbox Code Playgroud)

创建任务

t1_task = PythonOperator(
    task_id='my_operator_t1',
    python_callable=do_python_stuff,
    on_failure_callback=t2_error_task,
    dag=dag
)

t3_task_success = PythonOperator(
    task_id='my_operator_t3',
    python_callable=do_python_stuff_success,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

然后在t1的上游设置t3:

t1_task >> t3_task_success 
Run Code Online (Sandbox Code Playgroud)