动态创建任务列表

Mar*_*ler 2 airflow apache-airflow

我有一个 DAG,它是通过查询 DynamoDB 的列表创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。在下面的示例中未显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用它set_upstream来强制执行依赖关系。

- airflow_home
  \- dags
    \- workflow.py
Run Code Online (Sandbox Code Playgroud)

工作流.py

def get_task_list():
    # ... query dynamodb ...

def run_task(task):
    # ... do stuff ...

dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
    t = PythonOperator(
        task_id=task['id'],
        provide_context=False,
        dag=dag,
        python_callable=run_task,
        op_args=[task]
    )
Run Code Online (Sandbox Code Playgroud)

问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法受到 AWS 的限制并抛出异常。

我认为这是因为无论何时run_task()被调用,它都会运行所有全局变量,workflow.py所以我尝试run_task()进入一个单独的模块,如下所示:

- airflow_home
  \- dags
    \- workflow.py
    \- mypackage
      \- __init__
      \- task.py
Run Code Online (Sandbox Code Playgroud)

但这并没有改变任何事情。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为方式仍然相同。

我的问题与这些问题有关吗?

另外,为什么会workflow.py如此频繁地运行,为什么get_task_list()当任务方法不引用workflow.py并且不依赖它时会导致单个任务失败而引发的错误?

最重要的是,并行处理列表并强制执行列表中项目之间的任何依赖关系的最佳方法是什么?

Him*_*Him 5

根据您引用的问题,气流不支持在 dag 运行时创建任务。

因此,气流会在开始运行之前定期生成完整的 DAG 定义。理想情况下,此类生成的周期应与该 DAG 的调度间隔相同。

可能是每次气流检查 dag 的变化时,它也会生成完整的 dag,导致请求过多。该时间使用airflow.cfg 中的min_file_process_interval 和dag_dir_list_interval 配置进行控制。

关于任务失败,它们失败是因为 dag 创建本身失败并且气流无法启动它们。

  • 将 `min_file_process_interval` 设置为 30 会使对 `get_task_list()` 的调用减慢到 30 秒,并且我不再受到限制。至于动态任务创建,我将尝试创建一个 dag,它将构建另一个 dag 并将其保存到 [FAQ](http://airflow.readthedocs.io/) 中提到的 `globals()[dag_id]` en/latest/faq.html?highlight=dynamic#how-can-i-create-dags-dynamically) (2认同)