Ral*_*eus 7 python rabbitmq celery flask
我正在努力从 Celery 任务中获取结果。我的应用程序入口点如下所示:
from app import create_app,celery
celery.conf.task_default_queue = 'order_master'
order_app = create_app('../config.order_master.py')
Run Code Online (Sandbox Code Playgroud)
现在,在启动应用程序之前,我启动 RabbitMQ 并确保它没有队列:
root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
root@3d2e6b124780:/#
Run Code Online (Sandbox Code Playgroud)
现在我开始应用程序。启动后,我仍然在 RabbitMQ 中看到没有队列。当我从应用程序启动任务时,jobs.add_together.delay(2, 3)我得到任务 ID:
ralfeus@web-2 /v/w/order (multiple-instances)> (order) curl localhost/test
{"result":"a2c07de4-f9f2-4b21-ae47-c6d92f2a7dfe"}
ralfeus@web-2 /v/w/order (multiple-instances)> (order)
Run Code Online (Sandbox Code Playgroud)
那一刻我可以看到我的队列有一条消息:
root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
dd65ba89-cce9-3e0b-8252-c2216912a910 0
order_master 1
root@3d2e6b124780:/#
Run Code Online (Sandbox Code Playgroud)
现在我启动 Celery 工作者:
ralfeus@web-2 /v/w/order (multiple-instances)>
/usr/virtualfish/order/bin/celery -A main_order_master:celery worker --loglevel=INFO -n order_master -Q order_master --concurrency 2
INFO:app:Blueprints are registered
-------------- celery@order_master v5.0.0 (singularity)
--- ***** -----
-- ******* ---- Linux-5.4.0-51-generic-x86_64-with-glibc2.29 2020-10-22 16:38:56
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: app:0x7f374715c5b0
- ** ---------- .> transport: amqp://guest:**@172.17.0.1:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> order_master exchange=order_master(direct) key=order_master
[tasks]
. app.jobs.add_together
. app.jobs.post_purchase_orders
[2020-10-22 16:38:57,263: INFO/MainProcess] Connected to amqp://guest:**@172.17.0.1:5672//
[2020-10-22 16:38:57,304: INFO/MainProcess] mingle: searching for neighbors
[2020-10-22 16:38:58,354: INFO/MainProcess] mingle: all alone
[2020-10-22 16:38:58,375: INFO/MainProcess] celery@order_master ready.
[2020-10-22 16:38:58,377: INFO/MainProcess] Received task: app.jobs.add_together[f855bec7-307d-4570-ab04-3d036005a87b]
[2020-10-22 16:40:38,616: INFO/ForkPoolWorker-2] Task app.jobs.add_together[f855bec7-307d-4570-ab04-3d036005a87b] succeeded in 100.13561034202576s: 5
Run Code Online (Sandbox Code Playgroud)
因此,可见工作人员可以接收任务并执行它并产生结果。但是我无法得到结果。相反,当我请求结果时,我得到以下结果:
curl localhost/test/f855bec7-307d-4570-ab04-3d036005a87b
{"state":"PENDING"}
ralfeus@web-2 /v/w/order (multiple-instance)> (order)
Run Code Online (Sandbox Code Playgroud)
如果我现在检查队列,我会看到:
root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
dd65ba89-cce9-3e0b-8252-c2216912a910 1
65d80661-6195-3986-9fa2-e468eaab656e 0
celeryev.9ca5a092-9a0c-4bd5-935b-f5690cf9665b 0
order_master 0
celery@order_master.celery.pidbox 0
root@3d2e6b124780:/#
Run Code Online (Sandbox Code Playgroud)
我看到队列 dd65ba89-cce9-3e0b-8252-c2216912a910 有一条消息,当我检查时,该消息包含结果。那么为什么它会出现在那里以及我如何得到它?所有手册都说我只需要通过 ID 获取任务。但就我而言,任务仍处于待处理状态。
Ral*_*eus 14
根据芹菜文档:
\n\n\nRPC 结果后端 (RabbitMQ/QPid)
\nRPC 结果后端 (rpc://) 很特殊,因为它\xe2\x80\x99 并不实际存储状态,而是将它们作为消息发送。这是一个重要的区别,因为它意味着结果只能检索一次,并且只能由启动该任务的客户端检索。两个不同的\n进程不能\xe2\x80\x99等待相同的结果。
\n
因此 usingrpc://不适合稍后通过另一个请求检索结果。