Ita*_*vni 5 python python-3.x prefect
使用prefect,我想从其他两个流创建一个新流。
我得到的错误是A task with the slug "add_num" already exists in this flow.是否可以更新Flows使用相同tasks或Parameters. 下面是我试图完成的一个最小的例子。`
from prefect import task, Flow, Parameter
@task
def add_one(x):
return x+1
with Flow("Flow 1") as flow_1:
add_num = Parameter("add_num", default=10)
new_num1 = add_one(add_num)
@task
def add_two(y):
return y+1
with Flow("Flow 2") as flow_2:
add_num = Parameter("add_num", default=10)
new_num2 = add_two(add_num)
combo_fl = Flow("Add Numbers")
combo_fl.update(flow_1)
combo_fl.update(flow_2, validate=False)
Run Code Online (Sandbox Code Playgroud)
我确实在 slack 频道上看到了这段代码,它可能与解决这个问题有关,但我不知道如何使用它。
class GlobalParameter(Parameter):
def __init__(self, name, slug=None, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.slug = slug or uuid.uuid4()
Run Code Online (Sandbox Code Playgroud)
提前致谢。
由于参数在 API 中通过名称唯一标识,因此您无法组合具有相同名称的不同参数的两个流。但是,您可以做的是在每个流中使用通用参数,如下所示:
from prefect import task, Flow, Parameter
## initialize the Parameter outside of any
## Flow context
add_num = Parameter("add_num", default=10)
@task
def add_one(x):
return x+1
with Flow("Flow 1") as flow_1:
new_num1 = add_one(add_num)
@task
def add_two(y):
return y+1
with Flow("Flow 2") as flow_2:
new_num2 = add_two(add_num)
combo_fl = Flow("Add Numbers")
combo_fl.update(flow_1)
combo_fl.update(flow_2, validate=False)
Run Code Online (Sandbox Code Playgroud)
因为所使用的 Parameter 实际上是 Parameter 类的同一个实例,所以您的更新将会成功。
| 归档时间: |
|
| 查看次数: |
462 次 |
| 最近记录: |