考虑一个任务将列表作为参数并处理列表中的每个元素,这可能会成功也可能会失败。在这种情况下,如何仅对失败的元素进行“重试”?
例子:
@app.task(bind=True)
def my_test(self, my_list:list):
new_list = []
for ele in my_list:
try:
do_something_may_fail(ele)
except:
new_list.append(ele)
# how to retry with the new list?
# like
# self.retry(my_list=new_list, countdown=5)
# or
# self.apply_async(new_list, countdown=5)
Run Code Online (Sandbox Code Playgroud)
将Task.retry与其输入args一起使用kwargs。
\n\n\n
retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)重试该任务,将其添加到队列的末尾。
\n参数
\nargs (元组) \xe2\x80\x93 要重试的位置参数。
\nkwargs (Dict) \xe2\x80\x93 要重试的关键字参数。
\n
args传递参数时要小心,因为在和中只为相同参数设置值kwargs会导致失败。下面,我选择仅使用args=(<values here>)并清空kwargs={}。您也可以选择以相反的方式使用kwargs={<values here>}和清空args=()。
任务.py
\nfrom celery import Celery\n\napp = Celery(\'tasks\')\n\n\n@app.task(\n bind=True,\n default_retry_delay=0.1,\n retry_backoff=False,\n max_retries=None,\n)\ndef my_test(self, some_arg_1: int, my_list: list, some_arg_2: str):\n print(f"my_test {some_arg_1} {my_list} {some_arg_2}")\n\n # Filter the failed items. Here, let\'s say only the last item is successful.\n new_list = my_list[:-1]\n\n if new_list:\n self.retry(\n args=(\n some_arg_1 + 1, # some_arg_1 increments per retry\n new_list, # Failed items\n some_arg_2 * 2, # some_arg_2\'s length doubles per retry\n ),\n kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.\n )\nRun Code Online (Sandbox Code Playgroud)\n日志(生产者)
\n>>> from tasks import *\n>>> my_test.apply_async(args=(0, [1,2,3,4,5], "a"))\n<AsyncResult: 121090c6-6b77-4cbd-b1d1-790005e8b18c>\n>>>\n>>> # The above command is just equivalent to the following (just the same result):\n>>> # my_test.apply_async(kwargs={\'some_arg_1\': 0, \'my_list\': [1,2,3,4,5], \'some_arg_2\': "a"})\n>>> # my_test.apply_async(args=(0,), kwargs={\'my_list\': [1,2,3,4,5], \'some_arg_2\': "a"})\nRun Code Online (Sandbox Code Playgroud)\n日志(消费者)
\n[2021-08-25 21:32:06,433: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,434: WARNING/MainProcess] my_test 0 [1, 2, 3, 4, 5] a\n[2021-08-25 21:32:06,434: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,438: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s\n[2021-08-25 21:32:06,439: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,539: WARNING/MainProcess] my_test 1 [1, 2, 3, 4] aa\n[2021-08-25 21:32:06,539: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,541: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s\n[2021-08-25 21:32:06,542: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,640: WARNING/MainProcess] my_test 2 [1, 2, 3] aaaa\n[2021-08-25 21:32:06,640: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,642: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s\n[2021-08-25 21:32:06,643: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,742: WARNING/MainProcess] my_test 3 [1, 2] aaaaaaaa\n[2021-08-25 21:32:06,743: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,745: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s\n[2021-08-25 21:32:06,747: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received\n[2021-08-25 21:32:06,844: WARNING/MainProcess] my_test 4 [1] aaaaaaaaaaaaaaaa\n[2021-08-25 21:32:06,844: WARNING/MainProcess] \n\n[2021-08-25 21:32:06,844: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] succeeded in 0.0005442450019472744s: None\nRun Code Online (Sandbox Code Playgroud)\nsome_arg_1每次重试增加 1,从 的起始值0到最后一个值4my_list每次重试丢失 1 个项目,从 的起始值[1, 2, 3, 4, 5]到最后一个值[1]some_arg_2每次重试将其大小加倍,从 的起始值"a"到最后一个值"aaaaaaaaaaaaaaaa"只需从任务本身回忆起相同的任务,有点像递归。
\n任务.py
\nfrom celery import Celery\n\napp = Celery(\'tasks\')\n\n\n@app.task\ndef my_test(some_arg_1: int, my_list: list, some_arg_2: str):\n print(f"my_test {some_arg_1} {my_list} {some_arg_2}")\n\n # Filter the failed items. Here, let\'s say only the last item is successful.\n new_list = my_list[:-1]\n\n if new_list:\n my_test.apply_async(\n args=(\n some_arg_1 + 1, # some_arg_1 increments per retry\n new_list, # Failed items\n some_arg_2 * 2, # some_arg_2\'s length doubles per retry\n ),\n kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.\n )\nRun Code Online (Sandbox Code Playgroud)\n日志(生产者和消费者)
\n| 归档时间: |
|
| 查看次数: |
2746 次 |
| 最近记录: |