从文档来看,Number of allowed automatic retries if computing a result fails.
“结果”是指每个单独的任务还是整个compute()调用?
如果是指整个调用,那么dask.delayed中如何实现每个任务的重试呢?
另外,我不确定重试是否有效,如下面的代码所示。
import dask
import random
@dask.delayed
def add(x, y):
return x + y
@dask.delayed
def divide(sum_i):
n = random.randint(0, 1)
result = sum_i / n
return result
tasks = []
for i in range(3):
sum_i = add(i, i+1)
divide_n = divide(sum_i)
tasks.append(divide_n)
dask.compute(*tasks, retries=1000)
Run Code Online (Sandbox Code Playgroud)
预期输出为 (1, 3, 5),实际输出为 ZeroDivisionError。
如果有人感兴趣,我们可以使用 @retry 装饰器来执行任务,如下所示:
@dask.delayed
@retry(Exception, tries=3, delay=5)
def my_func():
pass
Run Code Online (Sandbox Code Playgroud)
重试装饰器:
from functools import wraps
def retry(exceptions, tries=4, delay=3, backoff=2, logger=None):
"""
Retry calling the decorated function using an exponential backoff.
Args:
exceptions: The exception to check. may be a tuple of
exceptions to check.
tries: Number of times to try (not retry) before giving up.
delay: Initial delay between retries in seconds.
backoff: Backoff multiplier (e.g. value of 2 will double the delay
each retry).
logger: Logger to use.
"""
if not logger:
logger = logging.getLogger(__name__)
def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 1:
try:
return f(*args, **kwargs)
except exceptions as e:
msg = f"{e}, \nRetrying in {mdelay} seconds..."
logger.warning(msg)
sleep(mdelay)
mtries -= 1
mdelay *= backoff
return f(*args, **kwargs)
return f_retry # true decorator
return deco_retry
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
862 次 |
| 最近记录: |