Kar*_*arl 2 python celery luigi
我正在使用芹菜作为我的网络应用程序.Celery执行父任务,然后执行进一步的任务
芹菜的问题
我无法获得与luigi相关的依赖图和可视化工具,以了解我的父任务的状态
Celery不提供重启故障管道的机制,并从失败的管道开始.
这两件事我很容易从路易吉得到.
所以我想,一旦芹菜运行父任务,然后在该任务内执行Luigi pipleine.
是否会出现任何问题,即我需要根据队列大小自动调整芹菜工人.会影响到多台机器上的任何路易吉工人吗?
从未尝试过,但我认为应该可以在celery任务中调用一个luigi任务表单,就像你从python代码中执行一样:
from foobar import MyTask
from luigi import scheduler
task = MyTask(123, 'another parameter value')
sch = scheduler.CentralPlannerScheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()
Run Code Online (Sandbox Code Playgroud)
关于扩展你的队列和芹菜工人:如果你有太多的芹菜工人在调用luigi任务当然需要你扩展你的luigi调度程序/守护程序,以便它可以处理API请求的数量(每次你调用一个任务就可以了你点击luigi调度程序API,每N秒 - 依赖于你的配置 - 你的任务将命中调度程序API说"我还活着",每次任务完成 - 错误或成功 - 你点击调度程序API , 等等).
所以,是的,仔细看看你的调度程序,看看它是否收到了太多的http请求,或者它的数据库是否是一个瓶颈(luigi默认使用sqlite,但你可以很容易地将它改为mysql o postgres).
更新:
从版本2.7.0开始,luigi.scheduler.CentralPlannerScheduler已经重命名luigi.scheduler.Scheduler为您在此处看到的内容,因此上面的代码现在应该是:
from foobar import MyTask
from luigi import scheduler
task = MyTask(123, 'another parameter value')
sch = scheduler.Scheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()
Run Code Online (Sandbox Code Playgroud)