气流任务引用多个先前的任务?

tre*_*nch 7 directed-acyclic-graphs airflow

有没有办法让我的任务需要完成多个仍然能够独立完成的上游任务?

  • download_fcr --> process_fcr --> load_fcr
  • download_survey --> process_survey --> load_survey

create_dashboard应该需要 load_fcr 和 load_survey 才能成功完成。

我不想强制“调查”任务链中的任何内容要求完成“fcr”任务链中的任何内容。我希望它们并行处理并且即使失败也能完成。但是,仪表板任务需要在开始之前完成加载到数据库。

fcr *-->*-->*
             \
               ---> create_dashboard
                /
survey *-->*-->*
Run Code Online (Sandbox Code Playgroud)

fla*_*max 7

您可以将任务列表传递给 set_upstream 或 set_downstream。在您的情况下,如果您特别想使用 set_upstream,您可以将您的依赖项描述为:

create_dashboard.set_upstream([load_fcr, load_survey])

load_fcr.set_upstream(process_fcr)
process_fcr.set_upstream(download_fcr)

load_survey.set_upstream(process_survey)
process_survey.set_upstream(download_survey)
Run Code Online (Sandbox Code Playgroud)

看一下airflow的源代码:即使你只将一个任务对象传递给set_upstream,它实际上在做任何事情之前都会围绕它包装一个列表。


jhn*_*lvr 5

download_fcr.set_downstream(process_fcr)
process_fcr.set_downstream(load_fcr)

download_survey.set_downstream(process_survey)
process_survey.set_downstream(load_survey)

load_survey.set_downstream(create_dashboard)
load_fcr.set_downstream(create_dashboard)
Run Code Online (Sandbox Code Playgroud)