确保 Celery 中来自不同来源的任务的顺序

Alb*_*y87 6 python multithreading rabbitmq celery

可以问你一个关于芹菜的问题吗?

我有不同的作家每 X 分钟写一个任务。每个任务都需要完成来自同一作者的任务。系统运行良好,X 分钟 >> 几秒钟来完成任务。

但是,现在,编写者可能会同时发送两个或三个任务。显然,Celery + RabbitMQ 会将这个任务分配给不同的 worker,造成麻烦。

我已经搜索过,但我发现了有关使用锁阻塞一个工人直到另一个完成(例如使用 Redis)的响应,但这是不可能的,因为我的作者少了一些工人。

我需要 N 个作者的 N 个队列,并且 Celery 能够理解每个队列中的顺序。我将有数以千计的作家,所以我不能创造这么多的工人。

示例:ABC 作家、A1、A2... 任务,并且只有一名工人

我在“同一”时间收到 A1,A2,B1,C1,B2,C2,A3,B3,C3

Celery 应该创建队列 A (1-2-3) B (1-2-3) C (1-2-3)

和发送任务A1,然后,接下来,A2,B1,C1不重要,但不应该是A3,B2,B3,C2,C3。

希望我解释得好

谢谢!

小智 5

我认为您需要为每个队列创建一个工作人员来强制执行这样的排序。否则,工作人员将仅使用先进先出的方法来处理任务。您可以根据需要创建任意数量的队列,并配置每个工作线程从哪些队列接收消息。-Q您可以在启动工作线程时传递参数来设置其队列,如工作线程指南中所述。

celery -A my_project worker -l info -Q A
Run Code Online (Sandbox Code Playgroud)

然后,您可以使用路由指南设置全局映射来定义每个任务进入哪些队列。

CELERY_ROUTES = {
    'my_app.tasks.task_a1': {'queue': 'A'},
    'my_app.tasks.task_a2': {'queue': 'A'},
    'my_app.tasks.task_b1': {'queue': 'B'},
    'my_app.tasks.task_c1': {'queue': 'C'},
}
Run Code Online (Sandbox Code Playgroud)

或者,您可以根据调用任务指南在提交每个任务实例时指定队列。

task_a1.apply_async(queue='A')
Run Code Online (Sandbox Code Playgroud)