基于外部文件的气流动态任务

dev*_*v ツ 6 airflow airflow-scheduler

我正在从外部文件读取元素列表并循环遍历元素以创建一系列任务。

例如,如果文件中有 2 个元素 - [A, B]。将有2个系列的任务:

A1 -> A2 ..
B1 -> B2 ...
Run Code Online (Sandbox Code Playgroud)

这种读取元素逻辑不是任何任务的一部分,而是 DAG 本身的一部分。因此,调度程序每天在读取 DAG 文件时会多次调用它。我只想在 DAG 运行时调用它。

想知道是否已经有这种用例的模式?

Nic*_*coE 17

根据您的要求,如果您正在寻找的是避免多次读取文件,但您不介意从元数据数据库中读取多次,那么您可以将用作迭代源的方法更改Variables为动态创建任务。

一个基本的示例可以是在 a 内执行文件读取PythonOperator并设置Variables稍后将用于迭代的 (相同的可调用):

样本文件.json:

{
    "cities": [ "London", "Paris", "BA", "NY" ]
}
Run Code Online (Sandbox Code Playgroud)

任务定义:

from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json

def _read_file():
    with open('dags/sample_file.json') as f:
        data = json.load(f)
        Variable.set(key='list_of_cities',
                     value=data['cities'], serialize_json=True)
        print('Loading Variable from file...')

def _say_hello(city_name):
    print('hello from ' + city_name)

with DAG('dynamic_tasks_from_var', schedule_interval='@once',
         start_date=days_ago(2),
         catchup=False) as dag:

    read_file = PythonOperator(
        task_id='read_file',
        python_callable=_read_file
    )

Run Code Online (Sandbox Code Playgroud)

然后您可以读取该变量并创建动态任务。(设置一个很重要default_var)。是TaskGroup可选的。

    # Top-level code
    updated_list = Variable.get('list_of_cities',
                                default_var=['default_city'],
                                deserialize_json=True)
    print(f'Updated LIST: {updated_list}')

    with TaskGroup('dynamic_tasks_group',
                   prefix_group_id=False,
                   ) as dynamic_tasks_group:

        for index, city in enumerate(updated_list):
            say_hello = PythonOperator(
                task_id=f'say_hello_from_{city}',
                python_callable=_say_hello,
                op_kwargs={'city_name': city}
            )

# DAG level dependencies
read_file >> dynamic_tasks_group
Run Code Online (Sandbox Code Playgroud)

调度程序日志中,您只会找到:

INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
Run Code Online (Sandbox Code Playgroud)

达格图视图:

有向图视图

通过这种方法,顶层代码(因此由调度程序连续读取)是对方法的调用Variable.get()。如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(本文中的示例

更新:

  • 至于 2021 年 11 月,这种方法被认为是一种“快速而肮脏”的解决方案。
  • 有效吗?是的,完全可以。是生产质量代码吗?不。
  • 它出什么问题了?每次调度程序解析文件时都会访问数据库(默认情况下每 30 秒一次),并且与 DAG 执行无关。有关 Airflow 最佳实践、顶级代码的完整详细信息。
  • 如何改进?考虑有关动态 DAG 生成的任何推荐方法是否适合您的需求。

  • 我发现了许多关于动态任务生成的文章和讨论(其中大多数来自过时的版本),但总的来说,所有这些最终都提出了相同的两种方法:从文件中读取或从变量中读取,然后迭代并创建任务。我不是在谈论动态创建 DAG,我的意思是根据同一 DAG 中先前任务的结果创建任务。我什至从 Airflow 的一位核心提交者那里找到了[答案](/sf/answers/3758032211/),建议采用这种方法。无论如何,如果有人知道更好的方法来实现这一目标,请给我打电话! (2认同)
  • 嘿@MehdiLAMRANI,我刚刚更新了答案,包括提到了 Airflow 文档的更新最佳实践部分。该主题在那里得到了进一步解释,并且还提供了一些代码替代方案。 (2认同)