Apache Airflow - 获取所有父 task_ids

pro*_*ath 6 airflow

假设有以下情况:

[c1, c2, c3] >> child_task
Run Code Online (Sandbox Code Playgroud)

所有c1c2c3child_task是运营商和有task_id等于id1id2id3child_id分别。

任务child_task也是PythonOperatorprovide_context=Truepython_callable=dummy_func

def dummy_func(**context):
    #...
Run Code Online (Sandbox Code Playgroud)

是否可以在dummy_func(也许通过使用上下文以某种方式浏览 dag)中检索所有父母的 ID ?

在这种情况下的预期结果将是一个列表['id1', 'id2', 'id3']

y2k*_*ham 7

upstream_task_idsdownstream_task_ids 性质BaseOperator意只是为了这个目的。

from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids
Run Code Online (Sandbox Code Playgroud)

但是请注意,使用此方法property,您只能获得任务的直接(上游/下游)邻居。为了获得所有祖先或后代 tasks,您可以快速编写这样的旧图论方法BFS-类似实现

from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator

def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
    ancestor_task_ids: Set[str] = set()
    tasks_queue: Queue = Queue()
    # determine parent tasks to begin BFS
    for task in my_task.upstream_list:
        tasks_queue.put(item=task)
    # perform BFS
    while not tasks_queue.empty():
        task: BaseOperator = tasks_queue.get()
        ancestor_task_ids.add(element=task.task_id)
        for _task in task.upstream_list:
            tasks_queue.put(item=_task)
    # Convert task_ids to actual tasks
    ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
    return ancestor_tasks
Run Code Online (Sandbox Code Playgroud)

上面的代码片段没有经过测试,但我相信你可以从中汲取灵感


参考