假设有以下情况:
[c1, c2, c3] >> child_task
Run Code Online (Sandbox Code Playgroud)
所有c1,c2,c3和child_task是运营商和有task_id等于id1,id2,id3和child_id分别。
任务child_task也是PythonOperator与provide_context=True和python_callable=dummy_func
def dummy_func(**context):
#...
Run Code Online (Sandbox Code Playgroud)
是否可以在dummy_func(也许通过使用上下文以某种方式浏览 dag)中检索所有父母的 ID ?
在这种情况下的预期结果将是一个列表['id1', 'id2', 'id3']。
在upstream_task_ids和downstream_task_ids 性质的BaseOperator意只是为了这个目的。
from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids
Run Code Online (Sandbox Code Playgroud)
但是请注意,使用此方法property,您只能获得任务的直接(上游/下游)邻居。为了获得所有祖先或后代 tasks,您可以快速编写像这样的旧图论方法BFS-类似实现
from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator
def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
ancestor_task_ids: Set[str] = set()
tasks_queue: Queue = Queue()
# determine parent tasks to begin BFS
for task in my_task.upstream_list:
tasks_queue.put(item=task)
# perform BFS
while not tasks_queue.empty():
task: BaseOperator = tasks_queue.get()
ancestor_task_ids.add(element=task.task_id)
for _task in task.upstream_list:
tasks_queue.put(item=_task)
# Convert task_ids to actual tasks
ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
return ancestor_tasks
Run Code Online (Sandbox Code Playgroud)
上面的代码片段没有经过测试,但我相信你可以从中汲取灵感
参考
| 归档时间: |
|
| 查看次数: |
4237 次 |
| 最近记录: |