Mar*_*oni 8 python rabbitmq celery
我正在尝试调用一个任务并为该任务创建一个队列(如果它不存在)然后立即将该被调用的任务插入该队列.我有以下代码:
@task
def greet(name):
return "Hello %s!" % name
def run():
result = greet.delay(args=['marc'], queue='greet.1',
routing_key='greet.1')
print result.ready()
Run Code Online (Sandbox Code Playgroud)
然后我有一个自定义路由器:
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'tasks.greet':
return {'queue': kwargs['queue'],
'exchange': 'greet',
'exchange_type': 'direct',
'routing_key': kwargs['routing_key']}
return None
Run Code Online (Sandbox Code Playgroud)
这会创建一个名为的交换greet.1和一个被调用的队列,greet.1但队列是空的.应该只调用交换机greet,它知道如何将路由密钥路由greet.1到被调用的队列greet.1.
有任何想法吗?
ask*_*sol 13
执行以下操作时:
task.apply_async(queue='foo', routing_key='foobar')
Run Code Online (Sandbox Code Playgroud)
然后Celery将从CELERY_QUEUES中的'foo'队列中获取默认值,或者如果它不存在则自动使用(queue = foo,exchange = foo,routing_key = foo)创建它
因此,如果CELERY_QUEUES中不存在'foo',您将最终得到:
queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')
Run Code Online (Sandbox Code Playgroud)
然后生产者将声明该队列,但由于您覆盖了routing_key,实际上是使用发送消息 routing_key = 'foobar'
这可能看起来很奇怪,但这种行为实际上对主题交换有用,您可以在其中发布不同的主题.
尽管你想做的事情比较困难,你可以自己创建队列并声明它,但这对自动消息发布重试不起作用.如果apply_async的队列参数可以支持自定义kombu.Queue而不是声明并将其用作目标,那会更好.也许你可以在http://github.com/celery/celery/issues上打开一个问题
| 归档时间: |
|
| 查看次数: |
6494 次 |
| 最近记录: |