multiprocessing.Pool:何时使用apply,apply_async或map?

Phy*_*win 254 python concurrency multithreading multiprocessing

我没有看到有关Pool.apply,Pool.apply_asyncPool.map的用例的明确示例.我主要是在用Pool.map; 别人有什么好处?

unu*_*tbu 383

回到Python的旧时代,要使用任意参数调用函数,您将使用apply:

apply(f,args,kwargs)
Run Code Online (Sandbox Code Playgroud)

apply虽然不在Python3中,但仍然存在于Python2.7中,并且通常不再使用.如今,

f(*args,**kwargs)
Run Code Online (Sandbox Code Playgroud)

是优选的.该multiprocessing.Pool模块试图提供类似的接口.

Pool.apply就像Python一样apply,除了函数调用是在一个单独的进程中执行的.Pool.apply阻止功能完成.

Pool.apply_async也像Python的内置apply,除了调用立即返回而不是等待结果.AsyncResult返回一个对象.您调用其get()方法来检索函数调用的结果.该get()方法将阻塞,直到功能完成.因此,pool.apply(func, args, kwargs)相当于pool.apply_async(func, args, kwargs).get().

与此相反Pool.apply,该Pool.apply_async方法还具有回调,如果提供,则在函数完成时调用该回调.这可以用来代替打电话get().

例如:

import multiprocessing as mp
import time

def foo_pool(x):
    time.sleep(2)
    return x*x

result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    result_list.append(result)

def apply_async_with_callback():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(foo_pool, args = (i, ), callback = log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
    apply_async_with_callback()
Run Code Online (Sandbox Code Playgroud)

可能会产生如下结果

[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]
Run Code Online (Sandbox Code Playgroud)

请注意,与pool.map结果的顺序不同,结果的顺序可能与pool.apply_async调用的顺序不对应.


因此,如果您需要在单独的进程中运行函数,但希望当前进程阻塞直到该函数返回,请使用Pool.apply.比如Pool.apply,Pool.map阻止直到返回完整的结果.

如果希望工作进程池池异步执行许多函数调用,请使用Pool.apply_async.结果的顺序不保证与调用的顺序相同Pool.apply_async.

另请注意,您可以调用许多不同的函数Pool.apply_async(并非所有调用都需要使用相同的函数).

相反,Pool.map将相同的函数应用于许多参数.但是,与之不同的是Pool.apply_async,结果以与参数顺序相对应的顺序返回.

  • 查看[multiprocessing/pool.py](http://hg.python.org/cpython/file/ea421c534305/Lib/multiprocessing/pool.py#l245),你会看到`Pool.map(func,iterable) `相当于`Pool.map_async(func,iterable).get()`.所以`Pool.map`和`Pool.map_async`之间的关系类似于`Pool.apply`和`Pool.apply_async`.`async`命令立即返回,而非`async`命令阻止.`async`命令也有一个回调. (35认同)
  • Windows上的"apply_async_with_callback()"之前应该有`if __name __ =="__ main __"`吗? (10认同)
  • 使用`Pool.map`和`Pool.apply`之间的决定类似于决定何时在Python中使用`map`或`apply`.您只需使用适合工作的工具即可.使用`async`和非`async`版本之间的决定取决于您是否希望调用阻止当前进程和/或是否要使用回调. (6认同)
  • @falsePockets:是的.每次调用`apply_async`都会返回一个`ApplyResult`对象.调用`ApplyResult`的`get`方法将返回相关函数的返回值(如果调用超时,则返回`mp.TimeoutError`.)因此,如果你将`ApplyResult`s放在一个有序列表中,那么调用他们的`get`方法将以相同的顺序返回结果.但是,在这种情况下你可以使用`pool.map`. (5认同)
  • 非常感谢。map_async呢? (2认同)
  • 如果您使用列表理解(例如,官方文档中的此示例)(https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers),该顺序是否得到保证?例如`multiple_results = [用于range(4)中的i的pool.apply_async(os.getpid,())]``print [用于multi_results中的res的res.get(timeout = 1)] (2认同)
  • @galactica:每次辅助函数成功结束(不引发异常)时,都会在主过程中*调用回调函数。辅助函数将返回值放入队列,主进程中的`pool._result_handler`线程一次处理一次返回值,并将返回值传递给回调函数。因此,您可以确保对每个返回的值都调用一次回调函数,并且这里没有并发问题,因为在主进程中,单个线程按顺序调用了该回调函数。 (2认同)
  • @galactica:`result_list`不是共享变量。尽管它可能存在于工作进程中,但仅应由主进程进行修改和访问。如果您确实在工作进程中修改了`result_list`,则主进程将看不到该值。 (2认同)

Ren*_* B. 90

以下是在一个表的格式,以显示之间的差异的概述Pool.applyPool.apply_asyncPool.mapPool.map_async。选择其中之一时,您必须考虑多参数、并发、阻塞和排序:

                  | Multi-args   Concurrence    Blocking     Ordered-results
---------------------------------------------------------------------
Pool.map          | no           yes            yes          yes
Pool.map_async    | no           yes            no           yes
Pool.apply        | yes          no             yes          no
Pool.apply_async  | yes          yes            no           no
Pool.starmap      | yes          yes            yes          yes
Pool.starmap_async| yes          yes            no           no
Run Code Online (Sandbox Code Playgroud)

笔记:

  • Pool.imapPool.imap_async– map 和 map_async 的懒惰版本。

  • Pool.starmap 方法,非常类似于 map 方法,除了它接受多个参数。

  • Async方法一次提交所有过程,并在完成后检索结果。使用 get 方法获取结果。

  • Pool.map(或Pool.apply)方法与Python内置的map(或apply)非常相似。它们阻塞主进程,直到所有进程完成并返回结果。

例子:

地图

一次性调用一份工作清单

results = pool.map(func, [1, 2, 3])
Run Code Online (Sandbox Code Playgroud)

申请

只能被称为一项工作

for x, y in [[1, 1], [2, 2]]:
    results.append(pool.apply(func, (x, y)))

def collect_result(result):
    results.append(result)
Run Code Online (Sandbox Code Playgroud)

地图异步

一次性调用一份工作清单

pool.map_async(func, jobs, callback=collect_result)
Run Code Online (Sandbox Code Playgroud)

apply_async

只能为一项作业调用并在后台并行执行一项作业

for x, y in [[1, 1], [2, 2]]:
    pool.apply_async(worker, (x, y), callback=collect_result)
Run Code Online (Sandbox Code Playgroud)

星图

pool.map支持多个参数的变体

pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
Run Code Online (Sandbox Code Playgroud)

星图_异步

starmap() 和 map_async() 的组合,它遍历可迭代对象的可迭代对象,并在解包可迭代对象的情况下调用 func。返回一个结果对象。

pool.starmap_async(calculate_worker, [(1, 1), (2, 1), (3, 1)], callback=collect_result)
Run Code Online (Sandbox Code Playgroud)

参考:

在这里找到完整的文档:https : //docs.python.org/3/library/multiprocessing.html

  • 如果 apply 没有得到同意,那它还有什么意义呢?使用? (5认同)
  • Pool.starmap() 阻塞 (3认同)
  • 一张表格/图片胜过一千个字,我总是一次又一次地回到这个答案来选择我想要的功能。 (3认同)

kak*_*ion 73

关于applyvs map:

pool.apply(f, args):f仅在池中的一个工作程序中执行.因此,池中的一个进程将运行f(args).

pool.map(f, iterable):此方法将iterable切换为多个块,并将其作为单独的任务提交给进程池.因此,您可以利用池中的所有进程.

  • 如果iterable是一个生成器怎么办? (3认同)
  • 真正.如果你想异步午餐,也可以看看pool.apply_async."pool_apply阻塞,直到结果准备好,因此apply_async()更适合并行执行工作" (3认同)