ala*_*uri 2 python task rabbitmq celery celerybeat
我有一台安装了 RabbitMQ 代理的服务器和两个 Celery 消费者(main1.py和main2.py),它们都连接到同一个代理。
在第一个消费者 ( main1.py ) 中,我实现了一个 Celery Beat,它在特定队列上多次发送不同的任务:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.beat', {'queue': 'print-queue'}),
],
)
app.conf.beat_schedule = {
'beat-every-10-seconds': {
'task': 'tasks.beat',
'schedule': 10.0
},
}
@app.task(name='tasks.beat', bind=True)
def beat(self):
for i in range(10):
app.send_task("tasks.print", args=[i], queue="print-queue")
return None
Run Code Online (Sandbox Code Playgroud)
在第二个消费者(main2.py)中,我实现了上面所说的任务:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.print', {'queue': 'print-queue'}),
],
)
@app.task(name='tasks.print', bind=True)
def print(self, name):
return name
Run Code Online (Sandbox Code Playgroud)
当我启动两个 Celery 工作人员时:
consumer1: celery worker -A main1 -Q print-queue --beat
consumer2: celery worker -A main2 -Q print-queue
Run Code Online (Sandbox Code Playgroud)
我收到这些错误:
[ERROR/MainProcess] Received unregistered task of type 'tasks.print'
Run Code Online (Sandbox Code Playgroud)
在第一个消费者身上
[ERROR/MainProcess] Received unregistered task of type 'tasks.beat'
Run Code Online (Sandbox Code Playgroud)
关于第二个消费者
是否可以在连接到同一代理的不同 Celery 应用程序上拆分任务?
提前致谢!
这就是正在发生的事情。你有两名工人A,B其中一名也恰好在运行 celerybeat(假设其中一名是B)。
task.beat到队列。这一切所做的就是在rabbit中加入一些元数据(包括任务名称)的消息。两名工人中的一名读到了这条消息。A 和 B 都在监听同一个队列,因此任何一方都可以读取它。
A。如果 A 读取该消息,它将尝试查找名为此任务的任务,tasks.beat因为 A 没有定义该任务,所以会失败。
b. 如果 B 读取该消息,它将成功尝试找到调用的任务tasks.beat(因为它确实有该任务)并将运行代码。tasks.beat将在rabbit中排队一条包含元数据的新消息tasks.print。
tasks.print,但其中任何一个都可能收到消息。在实践中,芹菜可能会做一些检查以提前抛出错误消息,但我相当确定这是根本问题。
简而言之,队列中的所有工作人员(包括beat)都应该运行相同的代码。
| 归档时间: |
|
| 查看次数: |
2898 次 |
| 最近记录: |