Snw*_*wBr 3 python queue task celery
我是使用 celery 的新手,有一个问题。我有这个简单的任务:
@app.task(name='test_install_queue')
def test_install_queue():
return subprocess.call("exit 0",shell=True)
Run Code Online (Sandbox Code Playgroud)
我稍后会在类似的测试用例中调用此任务
result = tasks.test_default_queue.apply_async(queue="install")
Run Code Online (Sandbox Code Playgroud)
任务在队列中成功运行install(因为我在 celery 日志中看到它,并且它完成得很好。但我想知道一种以编程方式test_install_queue从存储在result.
谢谢!
编辑:
我已将任务更改为:
@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
return self.request.__dict__
Run Code Online (Sandbox Code Playgroud)
然后我使用的结果apply_async如下:
result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]
Run Code Online (Sandbox Code Playgroud)
解决方法是工作程序(主机名)与工作程序中初始化的唯一队列具有相同的名称。
小智 7
您可以尝试以下方法:
delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
if queue.exchange.name == delivery_info['exchange']
and queue.routing_key == delivery_info['routing_key']:
original_queue = queue.name
break
Run Code Online (Sandbox Code Playgroud)
该方法建立在假设您使用默认 celery 设置并且您的交换是直接的基础上。如果您需要更通用的扇出和主题交换解决方案,那么您将必须检查app.amqp.queues.
我自己刚刚面临这个问题,我真的怀疑是否需要一个复杂的解决方案,例如来自“lexabug”的已经被接受的解决方案......所以,即使 Celery 文档也没有提供有效的替代方案,我我自己使用反射进行了调查,以了解哪个对象包含我需要的信息,我想出了一个超级简单直接的解决方案。具体来说,我正在编写一个钩子,或者更好的是,用 Celery 术语写一个信号,这就是我根据任务名称检索队列名称的方式:
@signals.after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
# "sender" is a string containing task name
# ("celery" here is the celery app)
task: Task = celery.tasks.get(sender)
# once we have the task object, we can access the "queue" property
# which contains the name of the queue
# (it' a dynamic property so don't expect support by your IDE)
queue_name: str = task.queue if task is not None else 'unknown'
Run Code Online (Sandbox Code Playgroud)
附:我正在使用芹菜 4.4