multiprocessing.Pool:map_async和imap有什么区别?

spa*_*ing 157 python multiprocessing python-multiprocessing

我想学习如何使用Python的multiprocessing包,但我不明白之间的差别map_asyncimap.我注意到,这两个map_asyncimap是异步执行的.那我什么时候应该使用另一个呢?我应该如何检索返回的结果map_async

我应该使用这样的东西吗?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 421

imap/ imap_unorderedmap/ 之间有两个主要区别map_async:

  1. 他们使用迭代的方式传递给他们.
  2. 他们将结果返回给你的方式.

map通过将iterable转换为列表(假设它不是列表)来消耗你的iterable,将它分成块,然后将这些块发送到工作进程中Pool.将iterable分解为块比执行一次一个项目的可迭代中的每个项目更好 - 特别是如果可迭代的大.但是,将iterable转换为列表以进行块化可能会产生非常高的内存成本,因为整个列表需要保存在内存中.

imap不会将您提供的迭代转换为列表,也不会将其分成块(默认情况下).它将一次遍历可迭代的一个元素,并将它们分别发送到工作进程.这意味着您不会将整个迭代转换为列表的内存命中,但它也意味着大型迭代的性能较慢,因为缺少分块.但是,可以通过传递chunksize大于默认值1 的参数来减轻这种情况.

imap/ imap_unorderedmap/ 之间的另一个主要区别map_async是,使用imap/ imap_unordered,您可以在工作准备就绪后立即开始接收工作人员的结果,而不必等待所有工作人员完成.使用时map_async,AsyncResult会立即返回a,但是在完成所有对象的处理之前,您无法实际检索该对象的结果,此时它将返回相同的列表map(map实际上是在内部实现的map_async(...).get()).没有办法得到部分结果; 你要么拥有整个结果,要么没有结果.

imap并且imap_unordered两者都立即返回迭代.有了imap,结果将在它们准备好后立即从迭代中产生,同时仍保留输入可迭代的顺序.使用时imap_unordered,无论输入可迭代的顺序如何,结果一旦准备就会产生.所以,说你有这个:

import multiprocessing
import time

def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))
Run Code Online (Sandbox Code Playgroud)

这将输出:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)
Run Code Online (Sandbox Code Playgroud)

如果您使用p.imap_unordered而不是p.imap,您会看到:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)
Run Code Online (Sandbox Code Playgroud)

如果你使用p.mapp.map_async().get(),你会看到:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)
Run Code Online (Sandbox Code Playgroud)

因此,使用imap/ imap_unorderedover的主要原因map_async是:

  1. 您的iterable足够大,将其转换为列表会导致您耗尽/使用太多内存.
  2. 您希望能够在完成所有结果之前开始处理结果.

  • 这种描述应该在官方的`Pool`文档中,而不是[现有的枯燥的](https://docs.python.org/3.6/library/multiprocessing.html#multiprocessing.pool.Pool). (38认同)
  • @HarshDaftary`apply`将一个任务发送给一个工作进程,然后阻塞直到它完成.`apply_async`将一个任务发送到工作进程,然后立即返回一个`AsyncResult`对象,该对象可用于等待任务完成并检索结果.`apply`是通过简单地调用`apply_async(...).get()`来实现的 (8认同)
  • @BallpointBen 一旦完成,它将继续进行下一项工作。排序在父进程中处理。 (2认同)
  • 如果您根本不关心返回结果,并且将过程的结果写入磁盘供以后使用,会发生什么情况? (2认同)

Boo*_*boo 5

接受的答案指出,imap_unordered“结果将在准备好后立即产生”,人们可能会推断结果将按照完成的顺序返回。但我只想说清楚,这在一般情况下是不正确。文档指出结果以任意顺序返回。考虑下面的程序,使用4:1的池的大小,一个迭代20的尺寸和CHUNKSIZE 5.工人函数的值睡取决于其传递的参数,这也确保一个可变的时间量,在池抓斗没有一个处理所有提交的任务。因此,我希望池中的每个进程都有20 / 4 = 5要处理的任务:

from multiprocessing import Pool
import time

def worker(x):
    print(f'x = {x}', flush=True)
    time.sleep(.1 * (20 - x))
    # return approximate completion time with passed argument:
    return time.time(), x

if __name__ == '__main__':
    pool = Pool(4)
    results = pool.imap_unordered(worker, range(20), chunksize=5)
    for t, x in results:
        print('result:', t, x)
Run Code Online (Sandbox Code Playgroud)

印刷:

x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4
Run Code Online (Sandbox Code Playgroud)

您可以清楚地看到这些结果不是按完成顺序产生的。例如,我被返回1621512519.7743165 9后跟1621512515.268784 0,它是由工作函数返回的比之前返回的结果早 4 秒多。但是,如果我将chunksize值更改为 1,则打印输出变为:

x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16
Run Code Online (Sandbox Code Playgroud)

按完成顺序。但是,我不愿声明,如果chunksize值指定为 1 ,imap_unordered 将始终返回可用结果,尽管基于此实验似乎是这种情况,因为文档没有做出这样的声明。

讨论

CHUNKSIZE指定的5,20个任务被放置在一个单一的输入队列为4个进程在池中过程中尺寸5的块使变为空闲将起飞队列的5个任务的下一个块的处理并依次处理它们中的每一个,然后再次空闲。因此,第一个进程将处理x参数 0 到 4,第二个进程将处理x参数 5 到 9,等等。这就是为什么您看到初始x值打印为 0、5、10 和 15 的原因。

但是,虽然x参数 0 的结果在x参数 9的结果之前完成,但结果似乎是作为块一起写出来的,因此x参数 0的结果将不会返回,直到x在同一队列中排队的参数的结果块(即 1、2、3 和 4)也可用。

  • 谢谢,这是一个好点。我同意你的观察,看起来 giben 结果值只有在它所属的整个块完成时才可供父级使用。 (4认同)