如何让celery任务从任务中失败?

Mei*_*ilo 30 celery celery-task

在某些情况下,我想让celery任务从该任务中失败.我尝试了以下方法:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False
Run Code Online (Sandbox Code Playgroud)

但是,该任务仍然报告成功:

任务sim.tasks.run_simulation [9235e3a7-c6d2-4219-bbc7-acf65c816e65]成功于1.17847704887s:False

似乎状态只能在任务运行时修改,一旦完成 - 芹菜将状态更改为它认为的结果(参考此问题).有没有办法,通过提出异常来使任务失败,让芹菜返回任务失败?

Pie*_*rre 23

更好的方法是将任务状态更新为FAILURE然后引发Ignore异常,因为返回任何值都会将任务记录为成功,例如:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()
Run Code Online (Sandbox Code Playgroud)

  • 请注意,这_不会_触发 task_failure 信号,因此不会触发您要附加到它们的任何处理程序,这并不好。 (6认同)

小智 10

我想进一步扩展皮埃尔的答案,因为我在使用建议的解决方案时遇到了一些问题。

为了在将任务状态更新为 states.FAILURE 时允许自定义字段,还必须模拟 FAILURE 状态将具有的一些属性(注意 exc_type 和 exc_message),虽然解决方案将终止任务,但任何查询状态的尝试(对于例如 - 获取“失败原因”值)将失败。

以下是我摘自的参考片段: https ://www.distributedpython.com/2018/09/28/celery-task-states/

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n'),
                'custom': '...'
            })
        raise Ignore()
Run Code Online (Sandbox Code Playgroud)


Mei*_*ilo 2

我从 Ask Solem 那里得到了关于这个问题的有趣答复,他提出了一个“after_return”处理程序来解决这个问题。这可能是未来一个有趣的选择。

与此同时,当我想让任务失败时,我只需从任务中返回一个字符串“FAILURE”,然后按如下方式检查即可解决该问题:

result = AsyncResult(task_id)
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
    # Failure processing task 
Run Code Online (Sandbox Code Playgroud)

  • 如果条件可以写为“READY_STATES | result.state” EXCEPTION_STATES:`其中`from celery.states import READY_STATES, EXCEPTION_STATES, UNREADY_STATES` (3认同)