如何在Python中并行化列表理解计算?

phy*_*nfo 43 python parallel-processing list-comprehension

列表推导和地图计算都应该 - 至少在理论上 - 相对容易并行化:列表理解中的每个计算都可以独立于所有其他元素的计算来完成.例如在表达式中

[ x*x for x in range(1000) ]
Run Code Online (Sandbox Code Playgroud)

每个x*x-计算可以(至少在理论上)并行完成.

我的问题是:是否有任何Python-Module/Python-Implementation/Python Programming-Trick来并行化列表理解计算(为了使用所有16​​/32/...核心或通过计算机网格分配计算或在云上)?

Mah*_*der 33

正如肯所说,它不能,但是使用2.6的多处理模块,并行计算很容易.

import multiprocessing

try:
    cpus = multiprocessing.cpu_count()
except NotImplementedError:
    cpus = 2   # arbitrary default


def square(n):
    return n * n

pool = multiprocessing.Pool(processes=cpus)
print(pool.map(square, range(1000)))
Run Code Online (Sandbox Code Playgroud)

文档中还有一些示例说明如何使用Managers执行此操作,这也应该允许分布式计算.

  • 这将在Windows上创建单独的进程,因为它缺少fork().这是一个昂贵的解决方案. (3认同)
  • 你不需要做`Pool(processes = cpus)`.如果你没有给出任何东西(或给出None),那么«使用cpu_count()返回的数字»([ref](https://docs.python.org/2/library/multiprocessing.html#multiprocessing. pool.multiprocessing.Pool)),所以它已经做了你正在做的事情. (2认同)

Sha*_*hin 8

关于列表理解的自动并行化

恕我直言,如果没有附加信息(例如在OpenMP中使用指令提供的信息),或者将其限制为仅涉及内置类型/方法的表达式,则列表理解的有效自动并行是不可能的.

除非保证对每个列表项执行的处理没有副作用,否则如果无序执行,结果可能无效(或至少不同).

# Artificial example
counter = 0

def g(x): # func with side-effect
    global counter
    counter = counter + 1
    return x + counter

vals = [g(i) for i in range(100)] # diff result when not done in order
Run Code Online (Sandbox Code Playgroud)

还有任务分配问题.问题空间应该如何分解?

如果每个元素的处理形成一个任务(〜任务场),那么当有许多元素都涉及普通计算时,管理任务的开销将淹没并行化的性能提升.

人们还可以采用数据分解方法,其中问题空间在可用进程之间平均分配.

列表理解也适用于生成器的事实使得这有点棘手,但是如果预迭代它的开销是可接受的,那么这可能不是显示阻塞.当然,如果后续项目过早地重复,则还存在具有副作用的发生器的可能性,其可以改变结果.非常不可能,但可能.

更大的担忧是跨流程的负载不平衡.无法保证每个元素都需要相同的时间来处理,因此静态分区数据可能会导致一个进程完成大部分工作,而闲置您的时间.

将列表分解为较小的块并在每个子进程可用时将它们分开是一个很好的折衷方案,但是,对于块大小的良好选择将取决于应用程序,因此如果没有来自用户的更多信息则不可行.

备择方案

如其他几个答案所述,根据一个要求,有许多方法和并行计算模块/框架可供选择.

仅使用MPI(在C中)并没有使用Python进行并行处理的经验,我无法保证任何(虽然,快速扫描, 多处理,jug,pppyro脱颖而出).

如果要求尽可能贴近列表理解,那么jug似乎是最接近的匹配.从本教程中,跨多个实例分发任务可以很简单:

from jug.task import Task
from yourmodule import process_data
tasks = [Task(process_data,infile) for infile in glob('*.dat')]
Run Code Online (Sandbox Code Playgroud)

虽然它做了类似的事情multiprocessing.Pool.map(),但jug可以使用不同的后端来同步进程和存储中间结果(redis,文件系统,内存中),这意味着进程可以跨越集群中的节点.


Geo*_*tin 5

使用新的3.2 concurrent.futures包中的futures.{Thread,Process}PoolExecutor.map(func, *iterables, timeout=None)futures.as_completed(future_instances, timeout=None)函数可能会有所帮助.

它也可作为2.6+ 反向移植.


Fre*_*Foo 5

对于共享内存并行性,我建议joblib

from joblib import delayed, Parallel

def square(x): return x*x
values = Parallel(n_jobs=NUM_CPUS)(delayed(square)(x) for x in range(1000))
Run Code Online (Sandbox Code Playgroud)


Ste*_*ang 5

正如上面的答案所指出的,这实际上很难自动完成。然后我认为问题实际上是如何以最简单的方式做到这一点。理想情况下,解决方案不需要您知道诸如“我有多少个内核”之类的信息。您可能想要的另一个属性是仍然能够在单个可读行中进行列表理解。

一些给出的答案似乎已经具有这样的好属性,但另一种选择是 Ray ( docs ),它是一个用于编写并行 Python 的框架。在 Ray 中,你会这样做:

import ray

# Start Ray. This creates some processes that can do work in parallel.
ray.init()

# Add this line to signify that the function can be run in parallel (as a
# "task"). Ray will load-balance different `square` tasks automatically.
@ray.remote
def square(x):
    return x * x

# Create some parallel work using a list comprehension, then block until the
# results are ready with `ray.get`.
ray.get([square.remote(x) for x in range(1000)])
Run Code Online (Sandbox Code Playgroud)