使用 Celery 在 RabbitMQ 中将消息发布到 Exchange

cap*_*alg 3 python django rabbitmq celery django-celery

我写了一个 Django 项目,它通过my_task.delay(). 问题是随着项目越来越大,正确路由任务真的很困难 - 我开始编写任务的唯一目的是组合不同的任务,这使得一段时间后代码变得混乱。

当我阅读一些 RabbitMQ 文档时,我找到了一个解决方案,它可以更好地构建我的项目,它依赖于ExchangesExchange可以将消息发布到Cosumer可以在其上消费它的多个队列,简而言之:

RabbitMQ 文档中的发布/订阅

所述的RabbitMQ-文档描述了与鼠兔的溶液,其是用于RabbitMQ的比芹菜更低水平的客户端。

芹菜,文档描述了此方案在其文档,但没有说明如何创建一个生产者产生的消息被发送到一个交易所就其分配到不同的队列如上面图片中看到。它只描述了如何向队列发送消息和发送任务- 但我希望由 Exchange 处理。

我发现 celery 所依赖的 Kombu 具有通过生产者将消息发送到 Exchange 的功能,但我找不到任何有关如何在 celery-django 中使用它的文档。

我如何能够在芹菜中归档所描述的程序?

PS:在 StackOverflow 上已经有一个类似的问题,它建议使用像 Chain 和 Group of Celery 这样的原语,但这与我理解的 Exchange 范式相矛盾。

小智 9

为了获得良好的任务路由,您应该创建更多队列。Celery 使用单个交换并直接绑定到队列。通过设置多个队列,您可以拆分工作。然后,您可以启动更多仅从某些队列中使用的工作程序,以更快地处理工作量最大的队列。

看看Sentry是怎么解决的:https : //github.com/getsentry/sentry/blob/master/src/sentry/conf/server.py#L467

此外,如果您真的想使用多个交换,您可以在设置文件中设置更多交换,并在 task_queues 上定义在哪个路由上使用哪个交换。保持直接在 celery 中交换,如果需要,您可以切换到其他经纪人。

first_exchange = kombu.Exchange(name="first", type="direct")
second_exchange = kombu.Exchange(name="second", type="direct")
task_queues = [
    kombu.Queue(
        name="queue1",
        exchange=first_exchange,
        routing_key="queue1",
    ),
    kombu.Queue(
        name="queue2",
        exchange=second_exchange,
        routing_key="queue2",
    )]
Run Code Online (Sandbox Code Playgroud)

I ended up on this question several times when I tried to solve posting messages from celery tasks to another exchange that is not used by celery. I thought I would share the result of my findings incase someone else end up here with the same question.

This is using Celery 4.3 and not django-celery that is no longer needed in django.

I have a django application that besides from using celery sends "regular" AMQP messages over RabbitMQ to other smaller integration applications and customers.

So in a celery task I would like to post to an exchange that is separate from the one I use for celery tasks and the messages are not tasks.

My initial solution just to get it to work was to create a new connection in each task. But I thought this was not so scalable as I would get a lot of connections if my application was handling alot of concurrent tasks. It as also annoying to import the django settings for my amqp-connection string everywhere I needed a new connection.

Instead I started to look into if I could somehow get the current connection from celery and reuse it to publish to RabbitMQ. Preferably using connection pools as I am doing in my non django consumers and producers.

It turns out that the connection and prodcucer pools are easily available.

Initial setup of my external messaging in celery.py:

app = Celery("my_proj")
setting_object = import_module(settings.CELERY_CONF_MODULE)
app.config_from_object(setting_object)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

with app.pool.acquire(block=True) as conn:
    exchange = kombu.Exchange(
        name=settings.AMQP_PUBLISH_TO, type="topic", durable=True, channel=conn
    )
    exchange.declare()

    queue = kombu.Queue(
        name="my_queue",
        exchange=exchange,
        routing_key="my_queue.#",
        channel=conn,
    )
    queue.declare()
Run Code Online (Sandbox Code Playgroud)

In my celery tasks I use the current_app since it is run on workers.

@task
def my_task(attrs):
    # do something
    with current_app.producer_pool.acquire(block=True) as producer:
        producer.publish(
            body,
            routing_key="my_queue.test",
            exchange=settings.AMQP_PUBLISH_TO,
            retry=True,
       )
Run Code Online (Sandbox Code Playgroud)

This works really well for me. But you can't really us CELERY_ALWAYS_EAGER. I had problems with that since the connections are not really used then. So my tests needed to be written a bit better, but that was ok.