Python - 从另一个队列重试失败的 Celery 任务

Pyt*_*ast 5 python celery django-celery

我正在将数据发布到 Celery 中的网络服务。有时,由于互联网中断,数据不会发布到 Web 服务,并且任务会无限次重试,直到发布。重试该任务是不必要的,因为网络已关闭,因此不需要再次重试。

我想到了一个更好的解决方案,即如果一个任务失败三次(最少重试3次),则将其转移到另一个队列。该队列包含所有失败任务的列表。现在,当互联网启动并且数据通过网络发布时,即正常队列中的任务已完成,然后它开始处理失败任务队列中的任务。这样就不会浪费一次又一次重试任务的CPU内存。

这是我的代码:- 截至目前,我只是再次重试该任务,但我怀疑这是否是正确的方法。

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):

    try : 
        client = SoapClient(
                            location = url,
                            action = 'http://tempuri.org/IService_1_0/',
                            namespace = "http://tempuri.org/", 
                            soap_ns='soap', ns = False
                            )

        response= client.UpdateShipment(
                                        Weight = Decimal(data['Weight']), 
                                        Length = Decimal(data['Length']), 
                                        Height = Decimal(data['Height']), 
                                        Width =  Decimal(data['Width']) , 
                                        )

    except Exception, exc:
        raise post_data_to_web_service.retry(exc=exc) 
Run Code Online (Sandbox Code Playgroud)

如何同时维护两个队列并尝试从两个队列执行任务。

设置.py

BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
Run Code Online (Sandbox Code Playgroud)

Chi*_*and 6

默认情况下,celery 将所有任务添加到名为 的队列中celery。因此,您可以在这里运行您的任务,当发生异常时,它会重试,一旦达到最大重试次数,您可以将它们转移到新的队列中foo

from celery.exceptions import MaxRetriesExceededError

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
    try:
        #do something with given args

 except MaxRetriesExceededError:
        post_data_to_web_service([data, url], queue='foo')

 except Exception, exc:
        raise post_data_to_web_service.retry(exc=exc) 
Run Code Online (Sandbox Code Playgroud)

当您启动工作程序时,此任务将尝试使用给定的数据执行某些操作。如果失败,它将重试 10 次,每次 60 秒。然后,当它遇到时,MaxRetriesExceededError它将相同的任务发布到新队列中foo

要使用这些任务,您必须启动一个新的工作人员

celery worker -l info -A my_app -Q foo
Run Code Online (Sandbox Code Playgroud)

或者如果您使用以下命令启动它,您也可以从默认工作线程中使用此任务

 celery worker -l info -A my_app -Q celery,foo
Run Code Online (Sandbox Code Playgroud)