如何在失败时恢复 Prefect 流程而不必重新运行整个流程?

rdm*_*ony 5 python prefect

TL; 博士;

我无法使用级长的FlowRunner来解决上述问题。我可能要么用错了(见下文)要么错过了一些东西。真的很感激任何指点!


问题

我通读了出色的级长核心文档,发现处理失败本地调试的部分与此最相关(可能漏掉了一些东西!)。该FlowRunner类出现(我)是解决方案。

要查看是否可以使用 Flow Runner 恢复失败的流:

  • 运行失败的流程:
from time import sleep

import prefect
from prefect import Flow, task


@task
def success():
    sleep(3)
    return


@task
def failure():
    return 1 / 0


def get_flow_runner():
    with Flow("Success/Failure") as flow:

        success()
        failure()

    return prefect.engine.FlowRunner(flow)
Run Code Online (Sandbox Code Playgroud)
  • 在 iPython 中运行它并保存状态:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
Run Code Online (Sandbox Code Playgroud)
  • 将 1 / 0 替换为 1 / 1failure()以便任务成功:

  • 最后将之前的状态传递给flow_runner希望它会恢复流程:

In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)
Run Code Online (Sandbox Code Playgroud)

整个流程再次运行,包括 3 秒成功的任务。

chr*_*ite 5

这里的问题是,您在每次运行时都会重建流程,这会更改任务对象。 state.result是一个字典,其键是 Task 对象 - 如果底层 Task 对象发生任何变化,它的哈希值也会发生变化。您应该使用更新的任务对象手动创建状态字典,如下所示:

from prefect.engine.state import Success

failure_task = runner.flow.get_tasks(name="failure")[0]
task_states = {failure_task: Success("Mocked success")}
Run Code Online (Sandbox Code Playgroud)

  • 你能分享一下代码吗?如果我不得不猜测您在其他示例中有多个名为“失败”的任务。仅供参考,要模拟数据通道,您可以使用“Success("Mocked data", result=42)”或任何数据。 (2认同)