spa*_*ing 157 python multiprocessing python-multiprocessing
我想学习如何使用Python的multiprocessing包,但我不明白之间的差别map_async和imap.我注意到,这两个map_async和imap是异步执行的.那我什么时候应该使用另一个呢?我应该如何检索返回的结果map_async?
我应该使用这样的东西吗?
def test():
result = pool.map_async()
pool.close()
pool.join()
return result.get()
result=test()
for i in result:
print i
Run Code Online (Sandbox Code Playgroud)
dan*_*ano 421
imap/ imap_unordered和map/ 之间有两个主要区别map_async:
map通过将iterable转换为列表(假设它不是列表)来消耗你的iterable,将它分成块,然后将这些块发送到工作进程中Pool.将iterable分解为块比执行一次一个项目的可迭代中的每个项目更好 - 特别是如果可迭代的大.但是,将iterable转换为列表以进行块化可能会产生非常高的内存成本,因为整个列表需要保存在内存中.
imap不会将您提供的迭代转换为列表,也不会将其分成块(默认情况下).它将一次遍历可迭代的一个元素,并将它们分别发送到工作进程.这意味着您不会将整个迭代转换为列表的内存命中,但它也意味着大型迭代的性能较慢,因为缺少分块.但是,可以通过传递chunksize大于默认值1 的参数来减轻这种情况.
imap/ imap_unordered和map/ 之间的另一个主要区别map_async是,使用imap/ imap_unordered,您可以在工作准备就绪后立即开始接收工作人员的结果,而不必等待所有工作人员完成.使用时map_async,AsyncResult会立即返回a,但是在完成所有对象的处理之前,您无法实际检索该对象的结果,此时它将返回相同的列表map(map实际上是在内部实现的map_async(...).get()).没有办法得到部分结果; 你要么拥有整个结果,要么没有结果.
imap并且imap_unordered两者都立即返回迭代.有了imap,结果将在它们准备好后立即从迭代中产生,同时仍保留输入可迭代的顺序.使用时imap_unordered,无论输入可迭代的顺序如何,结果一旦准备就会产生.所以,说你有这个:
import multiprocessing
import time
def func(x):
time.sleep(x)
return x + 2
if __name__ == "__main__":
p = multiprocessing.Pool()
start = time.time()
for x in p.imap(func, [1,5,3]):
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))
Run Code Online (Sandbox Code Playgroud)
这将输出:
3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)
Run Code Online (Sandbox Code Playgroud)
如果您使用p.imap_unordered而不是p.imap,您会看到:
3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)
Run Code Online (Sandbox Code Playgroud)
如果你使用p.map或p.map_async().get(),你会看到:
3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)
Run Code Online (Sandbox Code Playgroud)
因此,使用imap/ imap_unorderedover的主要原因map_async是:
接受的答案指出,imap_unordered“结果将在准备好后立即产生”,人们可能会推断结果将按照完成的顺序返回。但我只想说清楚,这在一般情况下是不正确的。文档指出结果以任意顺序返回。考虑下面的程序,使用4:1的池的大小,一个迭代20的尺寸和CHUNKSIZE 5.工人函数的值睡取决于其传递的参数,这也确保一个可变的时间量,在池抓斗没有一个处理所有提交的任务。因此,我希望池中的每个进程都有20 / 4 = 5要处理的任务:
from multiprocessing import Pool
import time
def worker(x):
print(f'x = {x}', flush=True)
time.sleep(.1 * (20 - x))
# return approximate completion time with passed argument:
return time.time(), x
if __name__ == '__main__':
pool = Pool(4)
results = pool.imap_unordered(worker, range(20), chunksize=5)
for t, x in results:
print('result:', t, x)
Run Code Online (Sandbox Code Playgroud)
印刷:
x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4
Run Code Online (Sandbox Code Playgroud)
您可以清楚地看到这些结果不是按完成顺序产生的。例如,我被返回1621512519.7743165 9后跟1621512515.268784 0,它是由工作函数返回的比之前返回的结果早 4 秒多。但是,如果我将chunksize值更改为 1,则打印输出变为:
x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16
Run Code Online (Sandbox Code Playgroud)
这是按完成顺序。但是,我不愿声明,如果将chunksize值指定为 1 ,imap_unordered 将始终返回可用结果,尽管基于此实验似乎是这种情况,因为文档没有做出这样的声明。
讨论
当CHUNKSIZE指定的5,20个任务被放置在一个单一的输入队列为4个进程在池中过程中尺寸5的块使变为空闲将起飞队列的5个任务的下一个块的处理并依次处理它们中的每一个,然后再次空闲。因此,第一个进程将处理x参数 0 到 4,第二个进程将处理x参数 5 到 9,等等。这就是为什么您看到初始x值打印为 0、5、10 和 15 的原因。
但是,虽然x参数 0 的结果在x参数 9的结果之前完成,但结果似乎是作为块一起写出来的,因此x参数 0的结果将不会返回,直到x在同一队列中排队的参数的结果块(即 1、2、3 和 4)也可用。