多处理 - 映射列表,终止超出超时限制的进程

pir*_*pir 9 python multiprocessing python-multiprocessing

我有一个我想要使用多处理修改的元素列表.问题是对于某些特定的输入(在尝试之前不可观察),我的部分功能会停止.我已经在概念上用下面的代码展示了这个函数,其中函数sometimes_stalling_processing()偶尔会无限期地停止.

为了将其置于上下文中,我正在使用Web scraper处理一堆链接,并且即使在请求模块中使用超时,其中一些链接也会停止.我尝试了不同的方法(例如使用eventlet),但得出的结论是,在多处理级别处理它可能更容易.

def stable_processing(obs):
    ...
    return processed_obs

def sometimes_stalling_processing(obs):
    ...
    return processed_obs

def extract_info(obs):
    new_obs = stable_processing(obs)
    try:
        new_obs = sometimes_stalling_processing(obs)
    except MyTimedOutError: # error doesn't exist, just here for conceptual purposes
        pass
    return new_obs

pool = Pool(processes=n_threads)
processed_dataset = pool.map(extract_info, dataset)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

这个问题(如何在超时后中断多处理.Pool中的任务?)似乎非常相似,但我一直无法将其转换为使用map而不是apply.我也尝试过使用该eventlet软件包,但这不起作用.请注意,我使用的是Python 2.7.

如何pool.map()对个别观察结果进行超时并杀死sometimes_stalling_processing

nox*_*fox 13

你可以看一下鹅卵石图书馆.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

def sometimes_stalling_processing(obs):
    ...
    return processed_obs

with ProcessPool() as pool:
    future = pool.map(sometimes_stalling_processing, dataset, timeout=10)

    iterator = future.result()

    while True:
        try:
            result = next(iterator)
        except StopIteration:
            break
        except TimeoutError as error:
            print("function took longer than %d seconds" % error.args[1])
Run Code Online (Sandbox Code Playgroud)

文档中的更多示例.