定期重新启动Python多处理池

Jos*_*ose 5 python python-multiprocessing

我有一个 Python 多处理池,它的工作时间很长,即使经过彻底的调试,它也不够健壮,不会每 24 小时左右失败一次,因为它依赖于许多具有复杂交互的第三方非 Python 工具。另外,底层机器存在某些我无法控制的问题。请注意,失败并不是指整个程序崩溃,而是部分或大部分进程由于某些错误而变得空闲,并且应用程序本身挂起或仅使用未失败的进程继续工作。

我现在的解决方案是定期手动终止该作业,然后从原来的位置重新启动。

即使它并不理想,我现在想做的是:从 Python 代码本身以编程方式定期重新启动多处理池。我真的不在乎这是否意味着在工作中杀死泳池工人。哪种方法是最好的方法?

我的代码如下所示:

with Pool() as p:
    for _ in p.imap_unordered(function, data):
        save_checkpoint()
        log()
Run Code Online (Sandbox Code Playgroud)

我的想法是这样的:

start = 0
end = 1000  # magic number
while start + 1 < len(data):
    current_data = data[start:end]
    with Pool() as p:
        for _ in p.imap_unordered(function, current_data):
            save_checkpoint()
            log()
            start += 1
            end += 1
Run Code Online (Sandbox Code Playgroud)

或者:

start = 0
end = 1000  # magic number
while start + 1 < len(data):
    current_data = data[start:end]
    start_timeout(time=TIMEOUT) # which would be the best way to to do that without breaking multiprocessing?
    try:
        with Pool() as p:
            for _ in p.imap_unordered(function, current_data):
                save_checkpoint()
                log()
                start += 1
                end += 1
    except Timeout:
        pass
    
Run Code Online (Sandbox Code Playgroud)

或者您认为更好的任何建议。任何帮助将不胜感激,谢谢!

2e0*_*byo 3

当前代码的问题在于它直接迭代多处理结果,并且该调用将被阻塞。幸运的是,有一个简单的解决方案:apply_async完全按照文档中的建议使用。但由于您在这里描述用例和失败的方式,我对其进行了一些调整。首先是一个模拟任务:

from multiprocessing import Pool, TimeoutError, cpu_count
from time import sleep
from random import randint


def log():
    print("logging is a dangerous activity: wear a hard hat.")


def work(d):
    sleep(randint(1, 100) / 100)
    print("finished working")
    if randint(1, 10) == 1:
        print("blocking...")
        while True:
            sleep(0.1)

    return d
Run Code Online (Sandbox Code Playgroud)

该工作函数将以 的概率失败0.1,无限期地阻塞。我们创建任务:

data = list(range(100))
nproc = cpu_count()
Run Code Online (Sandbox Code Playgroud)

然后为所有这些生成 future:

while data:
    print(f"== Processing {len(data)} items. ==")
    with Pool(nproc) as p:
        tasks = [p.apply_async(work, (d,)) for d in data]
Run Code Online (Sandbox Code Playgroud)

然后我们可以尝试手动把任务取出来:

        for task in tasks:
            try:
                res = task.get(timeout=1)
                data.remove(res)
                log()
            except TimeoutError:
                failed.append(task)
                if len(failed) < nproc:
                    print(
                        f"{len(failed)} processes are blocked,"
                        f" but {nproc - len(failed)} remain."
                    )
                else:
                    break
Run Code Online (Sandbox Code Playgroud)

这里的控制超时是 的超时.get。它应该与您期望的最长过程一样长。请注意,我们会检测整个池何时被占用并放弃。

但是,由于在您描述的场景中,某些线程将比其他线程花费更长的时间,因此我们可以给“失败”进程一些时间来恢复。因此,每次任务失败时,我们都会快速检查其他任务是否实际上已成功:

            for task in failed:
                try:
                    res = task.get(timeout=0.01)
                    data.remove(res)
                    failed.remove(task)
                    log()
                except TimeoutError:
                    continue
Run Code Online (Sandbox Code Playgroud)

对于您的情况来说,这是否是一个好的补充取决于您的任务是否真的像我猜测的那样不稳定。

退出池的上下文管理器将终止池,因此我们甚至不需要自己处理它。如果您有显着的变化,您可能需要增加池大小(从而增加允许停止的任务数量)或在认为任务“失败”之前允许任务有一个宽限期。