Celery - 运行一组具有复杂依赖关系的任务

ajd*_*ajd 5 celery celery-task django-celery

在我正在处理的应用程序中,用户可以执行"转换",其中包含"步骤".步骤可以对其他步骤具有任意数量的依赖性.我希望能够调用转换并将这些步骤作为单独的Celery任务并行执行.

理想情况下,我喜欢celery-tasktree的一些东西,除了一般的有向非循环图,而不仅仅是树,但似乎还没有这样的库存在.

首先想到的解决方案是对标准拓扑排序的并行调整 - 而不是确定满足依赖关系的步骤的线性排序,我们确定可以在开始时并行执行的整个步骤集,然后是可以在第2轮中执行的整个步骤集,依此类推.

但是,当任务占用不同的时间并且工作人员必须空闲等待较长时间运行的任务时,这不是最佳的,而现在有任务可以运行.(对于我的具体应用,此解决方案现在可能还不错,但我仍然想弄清楚如何优化它.)

https://cs.stackexchange.com/questions/2524/getting-parallel-items-in-dependency-resolution中所述,更好的方法是直接在DAG上运行 - 在每个任务完成后,检查是否有任何依赖现在可以运行任务,如果是,则运行它们.

实现这样的事情最好的方法是什么?我不清楚有一种简单的方法可以做到这一点.

据我所知,Celery的组/链/和弦原语不够灵活,不足以让我表达完整的DAG - 虽然我可能在这里错了?

我想我可以为当前任务完成后通知相关任务的任务创建一个包装器 - 我不确定处理这种通知的最佳方法是什么.访问应用程序的Django数据库并不是特别简洁,并且很难将其转换为通用库,但Celery本身并没有为此提供明显的机制.

小智 2

我也遇到了这个问题,但除了一个库之外,我真的找不到更好的解决方案或库,对于任何仍然感兴趣的人,您可以查看 https://github.com/selinon/selinon。虽然它仅适用于 python 3,但它似乎是唯一完全符合您想要的功能。

Airflow 是另一种选择,但 Airflow 与其他 dag 库一样在更静态的环境中使用。