气流:动态创建SubDag

Tua*_* Vu 5 airflow apache-airflow

我有一个用例,其中有一个客户列表。可以在列表中添加或删除客户端,客户端可以具有不同的开始日期和不同的初始参数。

我想使用气流根据每个客户端的初始开始日期回填所有数据,如果出现故障则重新运行。我正在考虑为每个客户端创建一个SubDag。这可以解决我的问题吗?

如何基于client_id动态创建SubDag?

gcb*_*son 2

您绝对可以动态创建 DAG 对象:

def make_client_dag(parent_dag, client):
  return DAG(
    '%s.client_%s' % (parent_dag.dag_id, client.name),
    start_date = client.start_date
  )
Run Code Online (Sandbox Code Playgroud)

然后,您可以在主 dag 的 SubDagOperator 中使用该方法:

for client in clients:
  SubDagOperator(
    task_id='client_%s' % client.name,
    dag=main_dag,
    subdag = make_client_dag(main_dag, client)
  )
Run Code Online (Sandbox Code Playgroud)

这将创建一个特定于集合中每个成员的子dag clients,并且每个子dag将在下次调用主dag时运行。我不确定您是否会得到您想要的回填行为。