bri*_*anz 5 python amqp rabbitmq celery rabbitmq-exchange
我对我的配置应该是什么样的设置主题交换感到困惑.
http://www.rabbitmq.com/tutorials/tutorial-five-python.html
这就是我想要完成的事情:
Task1 -> send to QueueOne and QueueFirehose
Task2 -> sent to QueueTwo and QueueFirehose
Run Code Online (Sandbox Code Playgroud)
然后:
Task1 -> consume from QueueOne
Task2 -> consume from QueueTwo
TaskFirehose -> consume from QueueFirehose
Run Code Online (Sandbox Code Playgroud)
我只希望Task1从QueueOne和Task2使用来从QueueTwo中使用.
现在的问题是,当Task1和2运行时,它们也会耗尽QueueFirehose,而TaskFirehose任务永远不会执行.
我的配置有问题,还是我误解了什么?
CELERY_QUEUES = {
"QueueOne": {
"exchange_type": "topic",
"binding_key": "pipeline.one",
},
"QueueTwo": {
"exchange_type": "topic",
"binding_key": "pipeline.two",
},
"QueueFirehose": {
"exchange_type": "topic",
"binding_key": "pipeline.#",
},
}
CELERY_ROUTES = {
"tasks.task1": {
"queue": 'QueueOne',
"routing_key": 'pipeline.one',
},
"tasks.task2": {
"queue": 'QueueTwo',
"routing_key": 'pipeline.two',
},
"tasks.firehose": {
'queue': 'QueueFirehose',
"routing_key": 'pipeline.#',
},
}
Run Code Online (Sandbox Code Playgroud)
假设您实际上的意思是这样的:
Task1 -> send to QueueOne
Task2 -> sent to QueueTwo
TaskFirehose -> send to QueueFirehose
Run Code Online (Sandbox Code Playgroud)
然后:
Worker1 -> consume from QueueOne, QueueFirehose
Worker2 -> consume from QueueTwo, QueueFirehose
WorkerFirehose -> consume from QueueFirehose
Run Code Online (Sandbox Code Playgroud)
这可能不完全是您的意思,但我认为它应该涵盖很多场景,希望也涵盖您的场景。像这样的东西应该有效:
# Advanced example starting 10 workers in the background:
# * Three of the workers processes the images and video queue
# * Two of the workers processes the data queue with loglevel DEBUG
# * the rest processes the default' queue.
$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
-Q default -L:4,5 DEBUG
Run Code Online (Sandbox Code Playgroud)
有关更多选项和参考:http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html
这是直接来自文档的。
我也遇到过类似的情况,但我的处理方式略有不同。我无法将 celery multi 与supervisord 一起使用。因此,我在supervisord 中为每个worker 创建了多个程序。无论如何,工作人员都会处于不同的流程中,因此只需让主管为您处理一切即可。配置文件看起来像:-
; ==================================
; celery worker supervisor example
; ==================================
[program:Worker1]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose
directory=/path/to/project
user=nobody
numprocs=1
stdout_logfile=/var/log/celery/worker1.log
stderr_logfile=/var/log/celery/worker1.log
autostart=true
autorestart=true
startsecs=10
; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600
; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true
; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998
Run Code Online (Sandbox Code Playgroud)
同样,对于 Worker2 和 WorkerFirehose,编辑相应的行以进行:
[program:Worker2]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose
Run Code Online (Sandbox Code Playgroud)
和
[program:WorkerFirehose]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose
Run Code Online (Sandbox Code Playgroud)
将它们全部包含在supervisord.conf 文件中,这样就可以了。