我正在尝试创建一个动态工作流程,但想要更改气流自动生成的任务名称并将其分配给列表中的任务。我尝试访问上下文并手动更改,taskid但这在 UI 中的管道渲染期间也不起作用。
我的代码
def get_the_route(router_ip, taskid):
dev1 = junos_ops()
dev1.open_fabric_connection()
result = dev1.dev_handler.rpc.get_route_information(destination="10.0.0.3", normalize=True)
logger.info("result is: {}".format(pformat(result)))
dev1.close_fabric_connection()
# <--do-some-logic-->
return {"result": result}
for dev in dev_list:
get_the_route_dev_list.append(get_the_route(router_ip=dev, taskid=dev))
start >> hello_task >> get_the_route_dev_list >> bye_task >> end
Run Code Online (Sandbox Code Playgroud)
生成的图
无论如何,是否可以为动态任务指定不同的名称?我知道这可以使用PythonOperator. 但我正在尝试使用它来做到这TaskFlow API一点。
谢谢
小智 13
您可以这样做(它也适用于动态 DAG):
@task()
def foo():
pass
with DAG(
'test_foo',
start_date=days_ago(1),
schedule_interval=None,
) as dag:
for name in ["a", "b", "c"]:
foo.override(task_id=name)()
Run Code Online (Sandbox Code Playgroud)
您可以在这里阅读更多信息:https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#reusing-a-decorated-task
小智 -1
对的,这是可能的。例如:
@task(task_id="different_name")
def foo():
pass
with DAG(
'test_foo',
start_date=days_ago(1),
schedule_interval=None,
) as dag:
foo()
Run Code Online (Sandbox Code Playgroud)