在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.
dag会触发工作吗?如果是这样,请你举个例子.
我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)
我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTaskSensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)
我有一个不规则上传的上传文件夹。对于每个上传的文件,我想生成一个特定于该文件的 DAG。
我的第一个想法是使用 FileSensor 来执行此操作,该文件传感器监视上传文件夹,并以新文件的存在为条件,触发创建单独 DAG 的任务。从概念上讲:
Sensor_DAG (FileSensor -> CreateDAGTask)
|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
Run Code Online (Sandbox Code Playgroud)
在我最初的实现中,CreateDAGTask是PythonOperator通过将它们放置在全局命名空间中来创建 DAG 全局变量(请参阅此 SO 答案),如下所示:
Sensor_DAG (FileSensor -> CreateDAGTask)
|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
Run Code Online (Sandbox Code Playgroud)
主 DAG 然后通过一个调用这个逻辑PythonOperator:
# File-sensing DAG
default_args = {
"depends_on_past" : False,
"start_date" : datetime(2020, 7, 16),
"retries" : 1,
"retry_delay" : timedelta(hours=5),
} …Run Code Online (Sandbox Code Playgroud) 我有一个简单的 python 运算符,定义如下:
loop_records = PythonOperator(
task_id = 'loop_records',
provide_context = True,
python_callable = loop_topic_records,
dag = dag
)
Run Code Online (Sandbox Code Playgroud)
这个 python 操作符调用loop_topic_records,定义如下:
def loop_topic_records(**context):
parent_dag = context['dag']
for i in range(3):
op = DummyOperator(
task_id="child_" + str(i),
dag=parent_dag
)
logging.info('Child operator ' + str(i))
loop_records >> op
Run Code Online (Sandbox Code Playgroud)
我看到代码没有引发任何错误。它甚至打印Child operator 0..2在日志中。但是,在 dag 中Graph view我没有看到子操作符,我只看到loop_records节点,就好像我的 dag 只包含一个操作符一样。那么,这有什么问题呢?我该如何解决?