Ada*_*ing 34 python amqp celery
我对AMQP的理解是消息只有以下组件:
队列附加到交易所.消息不能具有任何队列知识.它们只是发布到交换机,然后根据交换类型和路由密钥,消息被路由到一个或多个队列.
在Celery中,推荐的路由任务方法是通过CELERY_ROUTES设置.来自文档,CELERY_ROUTES是......
路由器列表,或用于将任务路由到队列的单个路由器. http://celery.readthedocs.org/en/latest/configuration.html#message-routing
它包括一个例子......
要将任务路由到feed_tasks队列,可以在
CELERY_ROUTES设置中添加条目 :Run Code Online (Sandbox Code Playgroud)CELERY_ROUTES = { 'feeds.tasks.import_feed': { 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, }
但是等一下 - 根据AMQP,消息只带有路由密钥!什么是"排队"在那里做什么?
此外,还有一个默认队列的概念.如果您调用未被捕获的任务CELERY_ROUTES,则会回退到CELERY_DEFAULT_QUEUE.但是再次 - 在AMQP中,消息不知道队列.这不应该是默认的路由密钥吗?
Mau*_*cco 16
确实,在Celery上,当你去Queues时会有一些混乱,你必须记住的一件事是队列参数是指Celery Kombu队列对象而不是直接指向AMQP队列,你可以通过阅读这个来理解这一点从文档中提取.当然,芹菜创建队列并使用相同名称进行交换的事实是队列参数使用混乱的根源.始终在文档中,您可以阅读本段:
如果您有另一个队列,但在另一个要添加的交换机上,只需指定自定义交换和交换类型:
Run Code Online (Sandbox Code Playgroud)CELERY_QUEUES = ( Queue('feed_tasks', routing_key='feed.#'), Queue('regular_tasks', routing_key='task.#'), Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'), routing_key='image.compress'), )
所以这样你可以在同一个交换机上绑定2个不同的队列.在仅使用交换机和密钥路由任务后,您可以使用Routers类
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'myapp.tasks.compress_video':
return {'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
return None
Run Code Online (Sandbox Code Playgroud)
更多这里http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
在那里声明队列的目的是让 celery 创建这些队列并使用 RabbitMQ 设置配置。
对于较低级别的AMQP客户端,您需要首先声明队列,然后声明交换器,最后将交换器绑定到队列。稍后发布消息时,您只需发布到交易所即可。
芹菜似乎使用这个结构来自动完成它。