我正在使用Luigi推出一些管道.我们来看一个简单的例子吧
task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()
Run Code Online (Sandbox Code Playgroud)
现在让我们说这myTask是在执行期间引发异常.我能够拥有的只是来自luigi的日志,显示异常.
有没有办法让路易吉传播它或至少恢复一个failure状态?
然后我就可以让我的程序在该状态下起作用.
谢谢.
编辑 我忘记在存储结果时指定luigi的输出是针对数据库的.如果引发异常,则不会存储任何结果,但异常不会传播到luigi.我想知道luigi是否可以选择这个.
mat*_*gus 16
来自docs:
Luigi有一个内置的事件系统,允许您注册事件的回调并从您自己的任务中触发它们.您既可以挂钩一些预先定义的事件,也可以创建自己的事件.每个事件句柄都绑定到一个Task类,并且只能从该类或其子类触发.这使您可以毫不费力地仅从特定类订阅事件(例如,对于hadoop作业).
例:
import luigi
from my_tasks import MyTask
@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
"""Will be called directly after a failed execution
of `run` on any MyTask subclass
"""
do_something()
luigi.run()
Run Code Online (Sandbox Code Playgroud)
Luigi有很多可以选择的活动.您还可以查看此测试,以了解如何倾听并对其他事件做出反应.
| 归档时间: |
|
| 查看次数: |
2527 次 |
| 最近记录: |