Celery:如何将失败的任务路由到死信队列

oni*_*aek 12 python rabbitmq celery

我是celery的新手,我尝试将这个任务队列集成到我的项目中,但我仍然不知道芹菜如何处理失败的任务,我想把所有这些都保存在一个amqp死信队列中.

根据这里的文档,似乎在启用了acks_late的任务中引发Reject会产生与查询消息相同的效果,然后我们会有一些关于死信队列的话.

所以我在我的celery配置中添加了一个自定义默认队列

celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'],
                       CELERY_TASK_SERIALIZER='json',
                       CELERY_QUEUES=[CELERY_QUEUE,
                                      CELERY_DLX_QUEUE],
                       CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME,
                       CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE
                       )
Run Code Online (Sandbox Code Playgroud)

我的海带对象看起来像

CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct')
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE,
                             routing_key='celery-dlq')

DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME,
                          'x-dead-letter-routing-key': 'celery-dlq'}

CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME,
                               arguments=DEAD_LETTER_CELERY_OPTIONS,
                               type='direct')

CELERY_QUEUE = Queue(CELERY_QUEUE_NAME,
                         exchange=CELERY_EXCHANGE,
                         routing_key='celery-q')
Run Code Online (Sandbox Code Playgroud)

我正在执行的任务是:

class HookTask(Task):
    acks_late = True

def run(self, ctx, data):
    logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self))
    self.hook_process(ctx, data)


def on_failure(self, exc, task_id, args, kwargs, einfo):
    logger.error('task_id %s failed, message: %s', task_id, exc.message)

def hook_process(self, t_ctx, body):
    # Build context
    ctx = TaskContext(self.request, t_ctx)
    logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id)
    raise Reject('no_reason', requeue=False)
Run Code Online (Sandbox Code Playgroud)

我用它做了一点测试但是在提出Reject异常时没有结果.

现在我想知道通过覆盖Task.on_failure强制将失败的任务路由强制到死信队列是个好主意.我认为这会有效,但我也认为这个解决方案不是那么干净,因为根据我的红芹菜应该单独做这件事.

谢谢你的帮助.

Hen*_* Li 9

我认为你不应该添加arguments=DEAD_LETTER_CELERY_OPTIONSCELERY_EXCHANGE。您应该将其添加到 CELERY_QUEUE 中queue_arguments=DEAD_LETTER_CELERY_OPTIONS

以下示例是我所做的并且运行良好:

from celery import Celery
from kombu import Exchange, Queue
from celery.exceptions import Reject

app = Celery(
    'tasks',
    broker='amqp://guest@localhost:5672//',
    backend='redis://localhost:6379/0')

dead_letter_queue_option = {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'dead_letter'
}

default_exchange = Exchange('default', type='direct')
dlx_exchange = Exchange('dlx', type='direct')

default_queue = Queue(
    'default',
    default_exchange,
    routing_key='default',
    queue_arguments=dead_letter_queue_option)
dead_letter_queue = Queue(
    'dead_letter', dlx_exchange, routing_key='dead_letter')

app.conf.task_queues = (default_queue, dead_letter_queue)

app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'


@app.task
def add(x, y):
    return x + y


@app.task(acks_late=True)
def div(x, y):
    try:
        z = x / y
        return z
    except ZeroDivisionError as exc:
        raise Reject(exc, requeue=False)
Run Code Online (Sandbox Code Playgroud)

创建队列后,您应该在“功能”列上看到DLX(死信交换)和DLK(死信路由键)标签。

在此输入图像描述

注意:如果您已经在 RabbitMQ 中创建了以前的队列,则应该删除它们。这是因为 celery 不会删除现有队列并重新创建一个新队列。

  • 你的例子对我不起作用。您必须从“task_queues”中删除“dead_letter_queue”,否则 celery Worker 将连接到该队列并消费消息(不进行处理)。尽管需要另一种方法来创建这些队列。 (3认同)

Kon*_*Pal 5

我有一个类似的案例,也遇到了同样的问题。我还想要一个基于配置而不是硬编码值的解决方案。李恒峰提出的解决方案非常有帮助,帮助我理解了机制和概念。但是死信队列的声明存在问题。具体来说,如果您将 DLQ 注入到 中task_default_queues,则 Celery 会消耗队列并且队列始终为空。因此需要一种手动声明 DL(X/Q) 的方法。

我使用了 Celery 的Bootsteps,因为它们在代码运行的阶段提供了良好的控制。我最初的实验是在应用程序创建后准确地创建它们,但这在进程分叉后创建了停滞的连接,并创建了一个丑陋的异常。通过在该步骤之后运行引导步骤,Pool您可以保证它在每个工作程序分叉且连接池准备就绪后在每个工作程序的开头运行。

最后,我创建了一个装饰器,通过使用 celery 重新引发,将未捕获的异常转换为任务拒绝Reject。对于已经决定如何处理任务的情况(例如重试)要特别注意。

这是一个完整的工作示例。尝试运行该任务div.delay(1, 0)并查看其工作原理。

from celery import Celery
from celery.exceptions import Reject, TaskPredicate
from functools import wraps
from kombu import Exchange, Queue

from celery import bootsteps


class Config(object):

    APP_NAME = 'test'

    task_default_queue = '%s_celery' % APP_NAME
    task_default_exchange = "%s_celery" % APP_NAME
    task_default_exchange_type = 'direct'
    task_default_routing_key = task_default_queue
    task_create_missing_queues = False
    task_acks_late = True

    # Configuration for DLQ support
    dead_letter_exchange = '%s_dlx' % APP_NAME
    dead_letter_exchange_type = 'direct'
    dead_letter_queue = '%s_dlq' % APP_NAME
    dead_letter_routing_key = dead_letter_queue


class DeclareDLXnDLQ(bootsteps.StartStopStep):
    """
    Celery Bootstep to declare the DL exchange and queues before the worker starts
        processing tasks
    """
    requires = {'celery.worker.components:Pool'}

    def start(self, worker):
        app = worker.app

        # Declare DLX and DLQ
        dlx = Exchange(
            app.conf.dead_letter_exchange,
            type=app.conf.dead_letter_exchange_type)

        dead_letter_queue = Queue(
            app.conf.dead_letter_queue,
            dlx,
            routing_key=app.conf.dead_letter_routing_key)

        with worker.app.pool.acquire() as conn:
            dead_letter_queue.bind(conn).declare()


app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.config_from_object(Config)


# Declare default queues
# We bypass the default mechanism tha creates queues in order to declare special queue arguments for DLX support
default_exchange = Exchange(
    app.conf.task_default_exchange,
    type=app.conf.task_default_exchange_type)
default_queue = Queue(
        app.conf.task_default_queue,
        default_exchange,
        routing_key=app.conf.task_default_routing_key,
        queue_arguments={
            'x-dead-letter-exchange': app.conf.dead_letter_exchange,
            'x-dead-letter-routing-key': app.conf.dead_letter_routing_key
        })

# Inject the default queue in celery application
app.conf.task_queues = (default_queue,)

# Inject extra bootstep that declares DLX and DLQ
app.steps['worker'].add(DeclareDLXnDLQ)


def onfailure_reject(requeue=False):
    """
    When a task has failed it will raise a Reject exception so
    that the message will be requeued or marked for insertation in Dead Letter Exchange
    """

    def _decorator(f):
        @wraps(f)
        def _wrapper(*args, **kwargs):

            try:
                return f(*args, **kwargs)
            except TaskPredicate:
                raise   # Do not handle TaskPredicate like Retry or Reject
            except Exception as e:
                print("Rejecting")
                raise Reject(str(e), requeue=requeue)
        return _wrapper

    return _decorator


@app.task()
@onfailure_reject()
def div(x, y):
    return x / y
Run Code Online (Sandbox Code Playgroud)

编辑:我更新了代码以使用 celery (小写)的新配置模式,因为我在 Celery 4.1.0 中发现了一些兼容性问题。