Celery/Redis同时执行多次并行任务

Nea*_*ara 7 python django redis celery

我有两个自定义任务(TaskATaskB),都继承自celery.Task.计划推出TaskA现在再逢,并TaskA推出N时间TaskB有不同的参数,每次.但由于某种原因,有时相同TaskB,具有相同的参数,同时执行两次,这会导致数据库出现不同的问题.

class TaskA(celery.Task):

    def run(self, *args, **kwargs):
        objects = MyModel.objects.filter(processed=False)\
                                 .values_list('id', flat=True)
        task_b = TaskB()
        for o in objects:
            o.apply_async(args=[o, ])

class TaskB(celery.Task):

    def run(self, obj_id, *args, **kwargs):
        obj = MyModel.objects.get(id=obj_id)
        # do some stuff with obj
Run Code Online (Sandbox Code Playgroud)

我尝试过的事情

我试图使用celery.group它希望它能解决这些问题,但我得到的只是错误,说run有2个参数而没有提供.

这就是我尝试TaskB使用的方式celery.group:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()
Run Code Online (Sandbox Code Playgroud)

我也尝试过这样:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()
Run Code Online (Sandbox Code Playgroud)

之前在那里执行任务g.apply_async().

这个问题是来自我如何启动任务还是其他什么?这是正常的行为吗?

附加信息

在我运行的本地计算机celery 3.1.13RabbitMQ 3.3.4,在服务器上celery 3.1.13运行Redis 2.8.9.在本地机器上,我看不到这样的行为,每个任务都执行一次.在服务器上,我看到1到10个这样的任务连续执行两次.

这就是我在本地机器和服务器上运行celery的方法:

celery_beat: celery -A proj beat -l info

celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50

celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200
Run Code Online (Sandbox Code Playgroud)

有效的解决方法

TaskB根据收到的参数引入了锁定.经过大约10个小时的测试,我看到究竟正在执行两次,但是锁可以防止数据库发生冲突.这确实解决了我的问题,但我仍然想知道它为什么会发生.

Tha*_*ing 6

您是否按照使用Celery 的Redis文档中的描述设置了fanout_prefix和?我正在将 Celery 与 Redis 一起使用,但没有遇到此问题。fanout_patterns