更改动态任务中自动生成的任务名称

Bas*_*Aly 8 python airflow

我正在尝试创建一个动态工作流程,但想要更改气流自动生成的任务名称并将其分配给列表中的任务。我尝试访问上下文并手动更改,taskid但这在 UI 中的管道渲染期间也不起作用。

我的代码

    def get_the_route(router_ip, taskid):
        dev1 = junos_ops()
        dev1.open_fabric_connection()
        result = dev1.dev_handler.rpc.get_route_information(destination="10.0.0.3", normalize=True)

        logger.info("result is: {}".format(pformat(result)))
        dev1.close_fabric_connection()
       # <--do-some-logic-->
        return {"result": result}

    for dev in dev_list:
        get_the_route_dev_list.append(get_the_route(router_ip=dev, taskid=dev))
    start >> hello_task >> get_the_route_dev_list >> bye_task >> end
Run Code Online (Sandbox Code Playgroud)

生成的图

阿帕奇气流用户界面

无论如何,是否可以为动态任务指定不同的名称?我知道这可以使用PythonOperator. 但我正在尝试使用它来做到这TaskFlow API一点。

谢谢

小智 13

您可以这样做(它也适用于动态 DAG):

@task()
def foo():
    pass

with DAG(
    'test_foo',
    start_date=days_ago(1),
    schedule_interval=None,
) as dag:
    for name in ["a", "b", "c"]:
        foo.override(task_id=name)()
Run Code Online (Sandbox Code Playgroud)

您可以在这里阅读更多信息:https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#reusing-a-decorated-task


小智 -1

对的,这是可能的。例如:

@task(task_id="different_name")
def foo():
    pass

with DAG(
    'test_foo',
    start_date=days_ago(1),
    schedule_interval=None,
) as dag:
    foo()
Run Code Online (Sandbox Code Playgroud)

结果是: 在此输入图像描述

  • 你明白这个问题了吗? (3认同)