相关疑难解决方法(0)

Airflow 任务能否在运行时动态生成 DAG?

我有一个不规则上传的上传文件夹。对于每个上传的文件,我想生成一个特定于该文件的 DAG。

我的第一个想法是使用 FileSensor 来执行此操作,该文件传感器监视上传文件夹,并以新文件的存在为条件,触发创建单独 DAG 的任务。从概念上讲:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
Run Code Online (Sandbox Code Playgroud)

在我最初的实现中,CreateDAGTaskPythonOperator通过将它们放置在全局命名空间中来创建 DAG 全局变量(请参阅此 SO 答案),如下所示:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
Run Code Online (Sandbox Code Playgroud)

主 DAG 然后通过一个调用这个逻辑PythonOperator

# File-sensing DAG
default_args = {
    "depends_on_past" : False,
    "start_date"      : datetime(2020, 7, 16),
    "retries"         : 1,
    "retry_delay"     : timedelta(hours=5),
} …
Run Code Online (Sandbox Code Playgroud)

airflow airflow-scheduler

7
推荐指数
1
解决办法
6257
查看次数

标签 统计

airflow ×1

airflow-scheduler ×1