如何获取运行任务的队列 - celery

Snw*_*wBr 3 python queue task celery

我是使用 celery 的新手,有一个问题。我有这个简单的任务:

@app.task(name='test_install_queue')
def test_install_queue():
    return subprocess.call("exit 0",shell=True)
Run Code Online (Sandbox Code Playgroud)

我稍后会在类似的测试用例中调用此任务

result = tasks.test_default_queue.apply_async(queue="install")
Run Code Online (Sandbox Code Playgroud)

任务在队列中成功运行install(因为我在 celery 日志中看到它,并且它完成得很好。但我想知道一种以编程方式test_install_queue从存储在result.

谢谢!

编辑:

我已将任务更改为:

@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
    return self.request.__dict__
Run Code Online (Sandbox Code Playgroud)

然后我使用的结果apply_async如下:

result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]
Run Code Online (Sandbox Code Playgroud)

解决方法是工作程序(主机名)与工作程序中初始化的唯一队列具有相同的名称。

小智 7

您可以尝试以下方法:

delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
    if queue.exchange.name == delivery_info['exchange'] 
        and queue.routing_key == delivery_info['routing_key']:
            original_queue = queue.name
            break
Run Code Online (Sandbox Code Playgroud)

该方法建立在假设您使用默认 celery 设置并且您的交换是直接的基础上。如果您需要更通用的扇出和主题交换解决方案,那么您将必须检查app.amqp.queues.


dav*_*ode 5

我自己刚刚面临这个问题,我真的怀疑是否需要一个复杂的解决方案,例如来自“lexabug”的已经被接受的解决方案......所以,即使 Celery 文档也没有提供有效的替代方案,我我自己使用反射进行了调查,以了解哪个对象包含我需要的信息,我想出了一个超级简单直接的解决方案。具体来说,我正在编写一个钩子,或者更好的是,用 Celery 术语写一个信号,这就是我根据任务名称检索队列名称的方式:

    @signals.after_task_publish.connect()
    def on_task_publish(sender=None, headers=None, body=None, **kwargs):

        # "sender" is a string containing task name 
        # ("celery" here is the celery app)
        task: Task = celery.tasks.get(sender)

        # once we have the task object, we can access the "queue" property 
        # which contains the name of the queue 
        # (it' a dynamic property so don't expect support by your IDE)
        queue_name: str = task.queue if task is not None else 'unknown'
Run Code Online (Sandbox Code Playgroud)

附:我正在使用芹菜 4.4