Python中的Concurrent.futures与多处理3

GIS*_*han 124 python multiprocessing python-3.x

Python 3.2引入了Concurrent Futures,它似乎是旧版线程和多处理模块的一些高级组合.

与旧的多处理模块相比,在CPU绑定任务中使用它有什么优缺点?

这篇文章表明他们更容易合作 - 是这样吗?

Tim*_*ers 118

我不会称之为concurrent.futures"高级" - 它是一个更简单的接口,无论您使用多个线程还是多个进程作为底层并行化噱头,它的工作方式都非常相似.

所以,像"简单的界面"的几乎所有情况下,大同小异的折衷涉及:它有一个浅的学习曲线,这在很大程度上只是因为有可用的要少得多,以学习; 但是,因为它提供的选项较少,它最终可能会让您以更丰富的界面不会让您感到沮丧.

到目前为止,受CPU约束的任务仍然存在,而且这些任务太过于低估,无法说明有意义.对于CPython下的CPU绑定任务,您需要多个进程而不是多个线程才有机会获得加速.但是,获得多少(如果有的话)加速取决于硬件,操作系统的细节,特别是您的特定任务需要多少进程间通信.在幕后,所有进程间并行化噱头都依赖于相同的操作系统原语 - 用于获取这些原语的高级API不是底线速度的主要因素.

编辑:示例

这是您引用的文章中显示的最终代码,但我添加了一个使其工作所需的import语句:

from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))}
Run Code Online (Sandbox Code Playgroud)

以下是完全相同的事情multiprocessing:

import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
    with mp.Pool(nprocs) as pool:
        return {num:factors for num, factors in
                                zip(nums,
                                    pool.map(factorize_naive, nums))}
Run Code Online (Sandbox Code Playgroud)

请注意,multiprocessing.Pool在Python 3.3中添加了将对象用作上下文管理器的功能.

哪一个更容易使用?哈哈;-)他们基本相同.

一个区别是Pool支持做的事情,你可能不知道是多么容易的很多不同的方式可以是直到你攀上了学习曲线相当一路上扬.

同样,所有这些不同的方式都是力量和弱点.它们是一种优势,因为在某些情况下可能需要灵活性.它们是一个弱点,因为"最好只有一种明显的方法".一个专门(如果可能)坚持的项目concurrent.futures从长远来看可能更容易维护,因为在其简约API的使用方式方面缺乏无端的新颖性.

  • *"你需要多个进程而不是多个线程才有机会获得加速"*太苛刻了.如果速度很重要; 代码可能已经使用了C库,因此可以释放GIL,例如regex,lxml,numpy. (19认同)
  • @TimPeters在某些方面,`ProcessPoolExecutor`实际上有比'Pool`更多的选项,因为`ProcessPoolExecutor.submit`返回允许取消(`cancel`)的`Future`实例,检查*引发了**异常(`exception`),以及动态添加要在完成时调用的回调(`add_done_callback`).这些功能都不适用于`Pool.apply_async`返回的`AsyncResult`实例.在其他方面,`Pool`有更多的选项,因为`Pool .`初始化器`/`initargs`,`maxtasksperchild`和`context`,以及`Pool`实例暴露的更多方法. (7认同)
  • @JFSebastian,谢谢你补充说 - 也许我应该说"在*纯*CPython下",但我担心在没有讨论GIL的情况下,没有简短的方法可以解释真相. (3认同)
  • 值得一提的是,当使用长IO操作时,线程可能特别有用且足够. (2认同)
  • @max,当然,但请注意,问题不是关于`Pool`,而是关于模块。`Pool` 是 `multiprocessing` 中的一小部分,并且在文档中如此之低,人们需要一段时间才能意识到它甚至存在于 `multiprocessing` 中。这个特定的答案集中在`Pool`上,因为这是OP链接到的所有文章,并且`cf`“更容易使用”对于文章讨论的内容来说根本不是真的。除此之外,`cf` 的 `as_completed()` 也非常方便。 (2认同)

Boo*_*boo 29

也许对于大多数的时候,你需要并行处理的时候,你会发现,无论是ProcessPoolExecutor从类concurrent.futures模块或Pool从类multiprocessing模块将提供等效的设施,把它归结为个人喜好的问题。但每个都提供了一些设施,使某些处理更方便。我想我会指出几个:

提交一批任务时,您有时希望在任务结果(即返回值)可用时立即获得它们。这两种工具都提供了通过回调机制提供来自提交任务的结果的通知:

使用multiprocessing.Pool

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def process_result(return_value):
    print(return_value)

def main():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(worker_process, args=(i,), callback=process_result)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

使用带有以下内容的回调也可以完成相同的操作,尽管很笨拙concurrent.futures

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def process_result(future):
    print(future.result())

def main():
    executor = concurrent.futures.ProcessPoolExecutor()
    futures = [executor.submit(worker_process, i) for i in range(10)]
    for future in futures:
        future.add_done_callback(process_result)
    executor.shutdown()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

这里每个任务都是单独提交的,并为其Future返回一个实例。然后回调必须添加到Future. 最后,当回调被调用时,传递的参数是Future已经完成的任务的实例,result必须调用方法来获取实际的返回值。但是有了这个concurrent.futures模块,实际上根本不需要使用回调。您可以使用以下as_completed方法:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(worker_process, i) for i in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

worker_process通过使用字典来保存Future实例,很容易将返回值与原始传递的参数联系起来:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(worker_process, i): i for i in range(10)}
        for future in concurrent.futures.as_completed(futures):
            i = futures[future] # retrieve the value that was squared
            print(i, future.result())

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

multiprocessing.Pool有方法imapand imap_unordered,后者允许以任意顺序返回任务结果,但不一定按完成顺序返回。这些方法被认为是一个懒惰的版本map。使用 method map,如果传递的iterable参数没有__len__属性,它将首先转换为 alist并且其长度将用于计算chunksize有效值(如果None作为chunksize参数提供)。因此,您无法通过使用生成器或生成器表达式作为可迭代对象来实现任何存储优化。但是使用方法imapimap_unordered可迭代的可以是生成器或生成器表达式;它将根据需要进行迭代以产生新的提交任务。但这需要默认的chunksize参数为 1,因为通常无法知道可迭代的长度。但这并不能阻止您使用multiprocessing.Pool类使用的相同算法提供合理的值,如果您对迭代的长度(或下例中的确切大小)有很好的近似值:

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize

def main():
    cpu_count = mp.cpu_count()
    N = 100
    chunksize = compute_chunksize(cpu_count, N)
    with mp.Pool() as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
            print(result)

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

但是,imap_unordered除非工作进程返回原始调用参数以及返回值,否则无法轻松地将结果与提交的作业联系起来。另一方面,指定chunksizewithimap_unordered和的能力imap,其结果将按可预测的顺序,应该使这些方法比apply_async重复调用方法更有效,这本质上相当于使用 1 的块大小。但如果你这样做需要按完成顺序处理结果,然后确保您应该使用apply_async带有回调函数的方法。它,然而,根据实验,如果你使用一个出现CHUNKSIZE值1与imap_unordered,结果将在完成顺序返回。

包中类的map方法在某一方面与包中的方法相似。此方法不会将其传递的作为生成器表达式的可迭代参数转换为列表以计算有效的chunksize值,这就是chunksize参数默认为 1 的原因,以及为什么如果您传递大型iterables,您应该考虑指定适当的chunksize值. 然而,与,它似乎从我的豁达,你不能开始遍历结果,直到所有的iterables被传递到已经重复。ProcessPoolExecutorconcurrent.futuresPool.imapmultiprocessingPool.imapmap

multiprocessing.Pool班有一个方法apply是将任务提交到游泳池和块,直到结果已经准备就绪。返回值只是传递给apply函数的工作函数的返回值。例如:

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def main():
    with mp.Pool() as pool:
        print(pool.apply(worker_process, args=(6,)))
        print(pool.apply(worker_process, args=(4,)))

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

concurrent.futures.ProcessPoolExecutor班有没有这样的等价物。您必须针对返回的实例发出 asubmit然后调用。这样做并不困难,但该方法对于适合阻塞任务提交的用例更方便。这种情况是当您有需要线程处理的处理时,因为在线程中完成的大部分工作都是大量 I/O,除了可能是一个非常受 CPU 限制的函数。创建线程的主程序首先创建一个方法,从而在另一个进程中运行代码并释放当前进程以允许其他线程运行。resultFuturePool.applymultiprocessing.Pool实例并将其作为参数传递给所有线程。当线程需要调用大量 CPU 绑定的函数时,它现在使用Pool.apply

concurrent.futures模块具有两个类,ProcessPoolExecutor并且ThreadPoolExecutor具有相同的接口,这一点非常重要。这是一个很好的功能。但是该multiprocessing模块还有一个未记录的ThreadPool类,其接口与Pool

>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>
Run Code Online (Sandbox Code Playgroud)

您可与提交的任务ProcessPoolExecutor.submit,它返回一个Future实例,或者Pool.apply_async,它返回一个AsyncResult实例,检索结果指定超时值:

from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep


def worker_1():
    while True:
        print('hanging')
        sleep(1)


def main():
    with ProcessPoolExecutor(1) as pool:
        future = pool.submit(worker_1)
        try:
            future.result(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')

if __name__ == '__main__':
    main()
    print("return from main()")
Run Code Online (Sandbox Code Playgroud)

印刷:

hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.
Run Code Online (Sandbox Code Playgroud)

调用时主进程future.result(3)TimeoutError在3秒后出现异常,因为提交的任务在那个时间段内没有完成。但是任务继续运行,with ProcessPoolExecutor(1) as pool:阻塞了进程并且块永远不会退出,因此程序不会终止。

hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.
Run Code Online (Sandbox Code Playgroud)

印刷:

hanging
hanging
hanging
timeout
return from main()
Run Code Online (Sandbox Code Playgroud)

然而,这一次,即使超时任务仍在继续运行并with阻塞进程,但不会阻止块退出,因此程序正常终止。这样做的原因是Pool实例的上下文管理器将terminate在块退出时执行调用,这会导致池中的所有进程立即终止。这与ProcessPoolExecutor实例的上下文处理程序形成对比,shutdown(wait=True)当它管理的块退出时,它执行调用以等待池中所有进程的终止。multiprocessing.Pool如果您使用上下文处理程序来处理池终止并且存在超时的可能性,那么优势似乎就会出现。

但是由于上下文处理程序multiprocessing.Pool仅用于调用terminate而不是close后跟join,因此您必须确保您提交的所有作业在退出with块之前都已完成,例如通过使用阻塞同步调用提交作业,例如map或 调用返回getAsyncResult对象通过对池实例的调用apply_async或迭代调用的结果imap或通过调用close后跟join

虽然使用 时无法退出,直到超时任务完成ProcessPoolExecutor,您可以取消尚未运行的已提交任务的启动。在下面的演示中,我们有一个大小为 1 的池,因此作业只能连续运行。我们一个接一个地提交 3 个作业,其中前两个作业需要 3 秒才能运行,因为调用time.sleep(3). 我们立即尝试取消前两个作业。第一次取消尝试失败,因为第一个作业已经在运行。但是因为池只有一个进程,第二个作业必须等待第一个作业完成 3 秒才能开始运行,因此取消成功。最后,作业 3 将在作业 1 完成后几乎立即开始和结束,这将在我们开始作业提交后大约 3 秒:

from multiprocessing import Pool, TimeoutError
from time import sleep


def worker_1():
    while True:
        print('hanging')
        sleep(1)

def main():
    with Pool(1) as pool:
        result = pool.apply_async(worker_1, args=())
        try:
            result.get(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')


if __name__ == '__main__':
    main()
    print("return from main()")
Run Code Online (Sandbox Code Playgroud)

印刷:

False
True
Done 1
Hello
3.1249606609344482
Run Code Online (Sandbox Code Playgroud)

  • 这是一个很棒的答案。 (19认同)

gol*_*vok 8

除了其他答案的详细差异列表之外,我个人还遇到过未修复的(截至 2022 年 11 月 20 日)无限期挂起,当其中一个工作人员以某种方式崩溃时,multiprocess.Pool 可能会发生这种情况。(在我的例子中,是 cython 扩展的异常,尽管其他人说当工作人员收到 SIGTERM 等时可能会发生这种情况。)根据ProcessPoolExecutor 的文档,自 python 3.3 以来,它对此一直很稳健。


Mic*_*ner 5

我喜欢concurrent.futures,主要是因为多个函数参数的迭代器:multiprocessing在获取函数的多个参数时有点hacky(没有istarmap()- 等价物starmap()):

import multiprocessing as mp

def power_plus_one(x, y):
    return (x**y) + 1

def wrapper(t):
    return power_plus_one(*t)

with mp.Pool() as pool:
    r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))

print(r)
Run Code Online (Sandbox Code Playgroud)

我发现imap()/imap_unordered()对于进度条tqdm或较大计算的时间估计非常有帮助。在 中concurrents.futures,这非常方便:

def power_plus_one(x, y):
    return (x**y) + 1

o = dict() # dict for output

with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
    for future in concurrent.futures.as_completed(futures):
        i = futures[future]
        o[i] = future.result()
print(o)
Run Code Online (Sandbox Code Playgroud)

我也喜欢作为字典的方便的结果映射。:)

使用 tqdm,您可以轻松地:

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
    ...
Run Code Online (Sandbox Code Playgroud)