Avi*_*sov 7 python queue celery gevent python-multiprocessing
什么是芹菜相当于一个multiprocessing.JoinableQueue(或gevent.queue.JoinableQueue)?
我正在寻找的功能是能够.join()从发布者那里获得Celery任务队列,等待队列中的所有任务完成.
等待初始AsyncResult或GroupResult不足够,因为队列由工作人员自己动态填充.
它可能并不完美,但这就是我最终想到的。
它基本上是JoinableQueue现有 Celery 队列之上的包装器,基于共享 Redis 计数器和列表侦听器。它要求队列名称与其路由键相同(由于 和before_task_publish信号的内部实现细节task_postrun)。
joinableceleryqueue.py:
from celery.signals import before_task_publish, task_postrun
from redis import Redis
import settings
memdb = Redis.from_url(settings.REDIS_URL)
class JoinableCeleryQueue(object):
    def __init__(self, queue):
        self.queue = queue
        self.register_queue_hooks()
    def begin(self):
        memdb.set(self.count_prop, 0)
    @property
    def count_prop(self):
        return "jqueue:%s:count" % self.queue
    @property
    def finished_prop(self):
        return "jqueue:%s:finished" % self.queue
    def task_add(self, routing_key, **kw):
        if routing_key != self.queue:
            return
        memdb.incr(self.count_prop)
    def task_done(self, task, **kw):
        if task.queue != self.queue:
            return
        memdb.decr(self.count_prop)
        if memdb.get(self.count_prop) == "0":
            memdb.rpush(self.finished_prop, 1)
    def register_queue_hooks(self):
        before_task_publish.connect(self.task_add)
        task_postrun.connect(self.task_done)
    def join(self):
        memdb.brpop(self.finished_prop)
我选择使用BRPOP而不是发布/订阅,因为我只需要一个侦听器来侦听“所有任务完成”事件(发布者)。
使用 aJoinableCeleryQueue非常简单 -begin()在将任何任务添加到队列之前,使用常规 Celery API 添加任务,.join()以等待所有任务完成。
| 归档时间: | 
 | 
| 查看次数: | 169 次 | 
| 最近记录: |