Celery的类似信号量的机制

NFi*_*ano 7 python semaphore distributed-computing celery

我们正在为Python + Celery开发一个用于我们任务队列的分布式应用程序.

我们的应用程序要求我们通过IMAP(例如:gmail)从远程ISP下载电子邮件,我们希望能够并行完成此任务.对于给定的电子邮件帐户,您被授予限制为多个模拟连接,因此我们需要一种方法来原子地跟踪所有正在下载的帐户的活动连接.

我已经使用Redis找到了Celery的多个原子锁示例,但没有一个可以跟踪这样的有限资源池,并且所有实现我们自己的尝试都导致难以调试竞争条件,导致我们的锁定间歇性地永远不会被释放.

und*_*run 2

由于 celery 使用进程的多处理库,因此您应该能够使用进程 safe multiprocessing.Semaphore([value])

您需要预先创建信号量并将其传入,并且您可以设置一个等于您想要允许的最大并发访问数的默认值。然后在 IMAP 连接之前获取并在断开连接后释放。