芹菜“重试”并更新参数

Mas*_*ero 6 python celery

考虑一个任务将列表作为参数并处理列表中的每个元素,这可能会成功也可能会失败。在这种情况下,如何仅对失败的元素进行“重试”?

例子:

@app.task(bind=True)
def my_test(self, my_list:list):
    new_list = []
    for ele in my_list:
        try:
            do_something_may_fail(ele)
        except:
            new_list.append(ele)
    # how to retry with the new list?
    # like 
    # self.retry(my_list=new_list, countdown=5)
    # or
    # self.apply_async(new_list, countdown=5)

Run Code Online (Sandbox Code Playgroud)

Nie*_*ano 5

解决方案1

\n

将Task.retry与其输入args一起使用kwargs

\n
\n

retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)

\n

重试该任务,将其添加到队列的末尾。

\n

参数

\n

args (元组) \xe2\x80\x93 要重试的位置参数。

\n

kwargs (Dict) \xe2\x80\x93 要重试的关键字参数。

\n
\n

args传递参数时要小心,因为在和中只为相同参数设置值kwargs会导致失败。下面,我选择仅使用args=(<values here>)并清空kwargs={}。您也可以选择以相反的方式使用kwargs={<values here>}和清空args=()

\n

任务.py

\n
from celery import Celery\n\napp = Celery(\'tasks\')\n\n\n@app.task(\n    bind=True,\n    default_retry_delay=0.1,\n    retry_backoff=False,\n    max_retries=None,\n)\ndef my_test(self, some_arg_1: int, my_list: list, some_arg_2: str):\n    print(f"my_test {some_arg_1} {my_list} {some_arg_2}")\n\n    # Filter the failed items. Here, let\'s say only the last item is successful.\n    new_list = my_list[:-1]\n\n    if new_list:\n        self.retry(\n            args=(\n                some_arg_1 + 1,  # some_arg_1 increments per retry\n                new_list,  # Failed items\n                some_arg_2 * 2,  # some_arg_2\'s length doubles per retry\n            ),\n            kwargs={},  # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.\n        )\n
Run Code Online (Sandbox Code Playgroud)\n

日志(生产者)

\n
>>> from tasks import *\n>>> my_test.apply_async(args=(0, [1,2,3,4,5], "a"))\n<AsyncResult: 121090c6-6b77-4cbd-b1d1-790005e8b18c>\n>>>\n>>> # The above command is just equivalent to the following (just the same result):\n>>> # my_test.apply_async(kwargs={\'some_arg_1\': 0, \'my_list\': [1,2,3,4,5], \'some_arg_2\': "a"})\n>>> # my_test.apply_async(args=(0,), kwargs={\'my_list\': [1,2,3,4,5], \'some_arg_2\': "a"})\n
Run Code Online (Sandbox Code Playgroud)\n

日志(消费者)

\n
[2021-08-25 21:32:06,433: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,434: WARNING/MainProcess] my_test 0 [1, 2, 3, 4, 5] a\n[2021-08-25 21:32:06,434: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,438: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s\n[2021-08-25 21:32:06,439: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,539: WARNING/MainProcess] my_test 1 [1, 2, 3, 4] aa\n[2021-08-25 21:32:06,539: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,541: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s\n[2021-08-25 21:32:06,542: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,640: WARNING/MainProcess] my_test 2 [1, 2, 3] aaaa\n[2021-08-25 21:32:06,640: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,642: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s\n[2021-08-25 21:32:06,643: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,742: WARNING/MainProcess] my_test 3 [1, 2] aaaaaaaa\n[2021-08-25 21:32:06,743: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,745: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s\n[2021-08-25 21:32:06,747: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,844: WARNING/MainProcess] my_test 4 [1] aaaaaaaaaaaaaaaa\n[2021-08-25 21:32:06,844: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,844: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] succeeded in 0.0005442450019472744s: None\n
Run Code Online (Sandbox Code Playgroud)\n
    \n
  • 每次重试都会更新所有任务参数:\n
      \n
    • some_arg_1每次重试增加 1,从 的起始值0到最后一个值4
    • \n
    • my_list每次重试丢失 1 个项目,从 的起始值[1, 2, 3, 4, 5]到最后一个值[1]
    • \n
    • some_arg_2每次重试将其大小加倍,从 的起始值"a"到最后一个值"aaaaaaaaaaaaaaaa"
    • \n
    \n
  • \n
\n

解决方案2

\n

只需从任务本身回忆起相同的任务,有点像递归。

\n

任务.py

\n
from celery import Celery\n\napp = Celery(\'tasks\')\n\n\n@app.task\ndef my_test(some_arg_1: int, my_list: list, some_arg_2: str):\n    print(f"my_test {some_arg_1} {my_list} {some_arg_2}")\n\n    # Filter the failed items. Here, let\'s say only the last item is successful.\n    new_list = my_list[:-1]\n\n    if new_list:\n        my_test.apply_async(\n            args=(\n                some_arg_1 + 1,  # some_arg_1 increments per retry\n                new_list,  # Failed items\n                some_arg_2 * 2,  # some_arg_2\'s length doubles per retry\n            ),\n            kwargs={},  # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.\n        )\n
Run Code Online (Sandbox Code Playgroud)\n

日志(生产者和消费者)

\n
    \n
  • 与解决方案1相同
  • \n
\n