自动注册新的完美流程?

eva*_*ois 6 python etl prefect

如果本地代理正在运行,是否有一种机制可以自动注册流/新流,而无需flow.register(...)在每个代理上手动运行?

在 Airflow 中,我相信他们有一个进程会定期扫描指定 Airflow 主文件夹中名称中的任何文件dag,然后在其中搜索 DAG 对象。如果找到它们,它就会加载它们,以便可以通过 UI 访问它们,而无需手动“注册”它们。

知府是否也存在类似的事情?因此,例如,如果我刚刚创建了以下文件 test_flow.py,而不必运行它或添加它,flow.run_agent()是否有一种方法可以让它神奇地注册并通过 UI 访问:) - 只是通过它简单地存在于正确的位置?

# prefect_home_folder/test_flow.py
import prefect
from prefect import task, Flow

@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Cloud!")

flow = Flow("hello-flow", tasks=[hello_task])

flow.register(project_name='main')
Run Code Online (Sandbox Code Playgroud)

我可以编写一个与气流过程具有类似行为的脚本,以定期扫描文件夹并注册流,但我想知道这是否有点hacky或者是否有更好的解决方案,我只是在以下方面考虑太多空气流动?

chr*_*ite 6

很好的问题(和很棒的用户名!) - 简而言之,我建议您在气流方面考虑太多。目前在 Prefect 中无法使用此功能有以下几个原因:

  • 显式优于隐式
  • 完美的流程不限于驻留在一个地方,也不限于具有相同的运行时环境;这使得流的自动发现+从单个代理进程重新序列化它变得复杂(不需要与其提交的流共享相同的运行时环境)
  • 代理最好被认为是由部署基础设施而不是流存储进行参数化

理想情况下,对于生产工作流程,您应该使用 CI/CD 流程,以便每当您更改代码时都会触发自动作业来重新注册流程。一些可能有帮助的评论:

  • 您实际上不需要为每个可能的代码更改重新注册流程;例如,如果您更改了hello_task示例中记录的消息,您只需将流重新保存到其原始位置(这取决于您使用的存储类型)。最终,如果有关流程的任何元数据(重试设置、任务名称、依赖关系等)发生变化,您只需要重新注册。
  • 您可以使用flow.register("My Project", idempotency_key=flow.serialized_hash())自动捕获此内容;如果流程的后端表示以某种方式发生变化,此模式只会注册新版本