Python Luigi 中的事件处理

Cam*_*tts 6 python error-handling event-handling luigi concourse

我一直在尝试将 Luigi 集成为我们的工作流处理程序。目前我们正在使用 concourse,但是我们尝试做的很多事情在 concourse 中都很麻烦,所以我们切换到 Luigi 作为我们的依赖管理器。到目前为止没有问题,工作流触发并正确执行。

当任务因任何原因失败时,问题就会出现。这种情况特别是任务的 requires 块,但是所有情况都需要处理。到目前为止,Luigi 优雅地处理了错误并将其写入 STDOUT。尽管如此,它仍然发出并退出代码 0,这意味着工作通过了。一个误报。

我一直试图让事件处理来解决这个问题,但我无法触发它,即使是一个非常简单的工作:

@luigi.Task.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    with open('/root/luigi', 'a') as f:
        f.write("we got the exception!") #testing in concourse image
    sys.exit(luigi.retcodes.retcode().unhandled_exception)

class Test(luigi.Task):
    def requires(self):
        raise Exception()
        return []

    def run(self):
        pass

    def output(self):
        return []
Run Code Online (Sandbox Code Playgroud)

然后在python shell中运行命令

luigi.run(main_task_cls=Test, local_scheduler=True)

异常被引发,但偶数不会触发或其他什么。文件未写入,退出代码仍为 0。

另外,如果它有所不同,我在 /etc/luigi/client.cfg 中有我的 luigi 配置,其中包含

[retcode]
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40
Run Code Online (Sandbox Code Playgroud)

我不知道为什么事件处理程序不会触发,但不知何故我需要该过程因错误而失败。

小智 3

问题似乎出在您放置“引发异常”调用的位置。如果您将其放在需要函数中 - 它基本上在您的测试任务运行方法之前运行。所以这并不是说你的测试任务失败了,而是它所依赖的任务失败了(现在是空的......)。

例如,如果您将加注移动到运行,您的代码将按照您的预期运行。

    def run(self):
       print('start')
       raise Exception()
Run Code Online (Sandbox Code Playgroud)

要处理依赖项失败的情况(在这种情况下,在 require 方法中引发异常),您可以添加另一种类型的 luigi 事件处理程序 BROKEN_TASK:luigi.Event.BROKEN_TASK。这将确保 luigi 代码发出您期望的返回代码(不同于 0)。

干杯!