我有一个不规则上传的上传文件夹。对于每个上传的文件,我想生成一个特定于该文件的 DAG。
我的第一个想法是使用 FileSensor 来执行此操作,该文件传感器监视上传文件夹,并以新文件的存在为条件,触发创建单独 DAG 的任务。从概念上讲:
Sensor_DAG (FileSensor -> CreateDAGTask)
|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
Run Code Online (Sandbox Code Playgroud)
在我最初的实现中,CreateDAGTask是PythonOperator通过将它们放置在全局命名空间中来创建 DAG 全局变量(请参阅此 SO 答案),如下所示:
Sensor_DAG (FileSensor -> CreateDAGTask)
|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
Run Code Online (Sandbox Code Playgroud)
主 DAG 然后通过一个调用这个逻辑PythonOperator:
# File-sensing DAG
default_args = {
"depends_on_past" : False,
"start_date" : datetime(2020, 7, 16),
"retries" : 1,
"retry_delay" : timedelta(hours=5),
} …Run Code Online (Sandbox Code Playgroud)