运行时的气流动态任务

Kir*_*rst 17 python airflow airflow-scheduler

关于"动态任务"的其他问题似乎涉及在计划或设计时动态构建DAG.我有兴趣在执行期间动态地将任务添加到DAG.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG('test_dag', description='a test',
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 1, 1),
          catchup=False)

def make_tasks():
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1 >> du2 >> du3

p = PythonOperator(
    task_id='python_operator',
    dag=dag,
    python_callable=make_tasks)
Run Code Online (Sandbox Code Playgroud)

这种天真的实现似乎不起作用 - 虚拟任务永远不会出现在UI中.

在执行期间向DAG添加新运算符的正确方法是什么?可能吗?

dla*_*lin 5

无法在DAG执行期间修改DAG(无需进行更多工作)。

dag = DAG(...是由调度循环回升。它将有任务实例'python_operator'。该任务实例在dag运行中进行调度,并由工作程序或执行程序执行。由于Airflow DB中的DAG模型仅由调度程序更新,因此这些添加的虚拟任务将不会持久保存到DAG中,也不会计划运行。他们在工人离开时将被遗忘。除非您从调度程序中复制有关持久性和更新模型的所有代码…否则下次调度程序访问DAG文件进行解析时,该代码将被撤消,这可能是每分钟一次,每秒一次或更快,具体取决于其他多少次有DAG文件要解析。

气流实际上希望每个DAG在运行之间大致保持相同的布局。它还希望不断重新加载/解析DAG文件。因此,尽管您可以制作一个DAG文件,该文件在每次运行时根据某些外部数据动态地确定任务(最好是缓存在文件或pyc模块中,而不是像数据库查找那样缓存在网络I / O中),但您会减慢整个调度循环的速度对于所有 DAG),这不是一个好计划,因为您的图形和树形视图将使所有操作变得混乱,并且该查找将使您的调度程序解析更加繁重。

您可以使可调用对象运行每个任务…

def make_tasks(context):
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1.execute(context)
    du2.execute(context)
    du3.execute(context)

p = PythonOperator(
    provides_context=true,
Run Code Online (Sandbox Code Playgroud)

但这是顺序的,您必须弄清楚如何使用python使它们并行(使用Future?),并且如果有任何异常,则整个任务将失败。而且它被绑定到一个执行者或工人,因此不使用气流的任务分配(kubernetes,mesos,芹菜)。

使用此方法的另一种方法是添加固定数量的任务(最大数量),并使用可调用对象将不需要的任务短路或为每个任务使用xcom推送参数,从而在运行时更改其行为但不更改DAG。