Python 3.2引入了Concurrent Futures,它似乎是旧版线程和多处理模块的一些高级组合.
与旧的多处理模块相比,在CPU绑定任务中使用它有什么优缺点?
这篇文章表明他们更容易合作 - 是这样吗?
我不确定这是否更像是一个操作系统问题,但我想我会问这里,以防任何人从Python的结尾有一些见解.
我一直在尝试使用一个CPU密集型for循环joblib,但是我发现不是将每个工作进程分配给不同的核心,我最终将它们全部分配到同一个核心而没有性能提升.
这是一个非常简单的例子......
from joblib import Parallel,delayed
import numpy as np
def testfunc(data):
# some very boneheaded CPU work
for nn in xrange(1000):
for ii in data[0,:]:
for jj in data[1,:]:
ii*jj
def run(niter=10):
data = (np.random.randn(2,100) for ii in xrange(niter))
pool = Parallel(n_jobs=-1,verbose=1,pre_dispatch='all')
results = pool(delayed(testfunc)(dd) for dd in data)
if __name__ == '__main__':
run()
Run Code Online (Sandbox Code Playgroud)
...这是我在htop脚本运行时看到的内容:

我在一台4核的笔记本电脑上运行Ubuntu 12.10(3.5.0-26).显然joblib.Parallel是为不同的工作者生成单独的进程,但有没有办法让这些进程在不同的内核上执行?
我正在学习如何使用Python中threading的multiprocessing模块和并行运行某些操作并加快我的代码.
我发现这很难(可能因为我没有任何理论背景)来理解一个threading.Thread()对象和一个对象之间的区别multiprocessing.Process().
此外,我并不完全清楚如何实例化一个作业队列,并且只有4个(例如)它们并行运行,而另一个则在执行之前等待资源释放.
我发现文档中的示例清晰,但不是很详尽; 一旦我尝试使事情复杂化,我就会收到许多奇怪的错误(比如一种无法腌制的方法,等等).
那么,我什么时候应该使用threading和multiprocessing模块?
您能否将我链接到一些资源,解释这两个模块背后的概念以及如何正确使用它们来完成复杂的任务?
python parallel-processing multithreading process multiprocessing
假设我有一个大内存numpy数组,我有一个函数func,它接受这个巨大的数组作为输入(连同一些其他参数).func具有不同参数可以并行运行.例如:
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
Run Code Online (Sandbox Code Playgroud)
如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中.
有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改.
更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法分享它?
[EDITED]
我读了答案,但我仍然有点困惑.由于fork()是copy-on-write,因此在python多处理库中生成新进程时不应调用任何额外的成本.但是下面的代码表明存在巨大的开销:
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = …Run Code Online (Sandbox Code Playgroud) python parallel-processing numpy shared-memory multiprocessing
一个程序,它创建可在可连接队列上工作的多个进程Q,并最终可能操纵全局字典D来存储结果.(因此每个子进程可用于D存储其结果,并查看其他子进程正在生成的结果)
如果我在子进程中打印字典D,我会看到已对其进行的修改(即在D上).但是在主进程加入Q之后,如果我打印D,那就是空的dict!
我知道这是一个同步/锁定问题.有人能告诉我这里发生了什么,以及如何同步访问D?
我有以下设置:
do_some_processing(filename):
for line in file(filename):
if line.split(',')[0] in big_lookup_object:
# something here
if __name__ == '__main__':
big_lookup_object = marshal.load('file.bin')
pool = Pool(processes=4)
print pool.map(do_some_processing, glob.glob('*.data'))
Run Code Online (Sandbox Code Playgroud)
我正在将一些大对象加载到内存中,然后创建一个需要使用该大对象的工作池.大对象以只读方式访问,我不需要在进程之间传递它的修改.
我的问题是:加载到共享内存中的大对象,如果我在unix/c中生成进程,或者每个进程是否加载了自己的大对象副本?
更新:进一步澄清 - big_lookup_object是一个共享查找对象.我不需要拆分它并单独处理它.我需要保留一份副本.我需要分割它的工作是读取许多其他大文件,并在查找对象中查找这些大文件中的项目.
进一步更新:数据库是一个很好的解决方案,memcached可能是一个更好的解决方案,磁盘上的文件(shelve或dbm)可能更好.在这个问题中,我对内存解决方案特别感兴趣.对于最终的解决方案,我将使用hadoop,但我想看看我是否也可以拥有本地内存版本.
我想在共享内存中使用numpy数组与多处理模块一起使用.困难是使用它像一个numpy数组,而不仅仅是一个ctypes数组.
from multiprocessing import Process, Array
import scipy
def f(a):
a[0] = -a[0]
if __name__ == '__main__':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
arr = Array('d', unshared_arr)
print "Originally, the first two elements of arr = %s"%(arr[:2])
# Create, start, and finish the child processes
p = Process(target=f, args=(arr,))
p.start()
p.join()
# Printing out the changed values
print "Now, the first two elements of arr = %s"%arr[:2]
Run Code Online (Sandbox Code Playgroud)
这会产生如下输出:
Originally, the first two elements of arr = …Run Code Online (Sandbox Code Playgroud) 了解Python 多处理(来自PMOTW文章),并希望对该join()方法的具体用途有所了解.
在2008年的一个旧教程中,它声明如果没有p.join()下面代码中的调用,"子进程将处于空闲状态而不会终止,成为僵尸,你必须手动杀死".
from multiprocessing import Process
def say_hello(name='world'):
print "Hello, %s" % name
p = Process(target=say_hello)
p.start()
p.join()
Run Code Online (Sandbox Code Playgroud)
我添加了一个打印输出PID以及一个time.sleep测试,据我所知,该过程自行终止:
from multiprocessing import Process
import sys
import time
def say_hello(name='world'):
print "Hello, %s" % name
print 'Starting:', p.name, p.pid
sys.stdout.flush()
print 'Exiting :', p.name, p.pid
sys.stdout.flush()
time.sleep(20)
p = Process(target=say_hello)
p.start()
# no p.join()
Run Code Online (Sandbox Code Playgroud)
在20秒内:
936 ttys000 0:00.05 /Library/Frameworks/Python.framework/Versions/2.7/Reso
938 ttys000 0:00.00 /Library/Frameworks/Python.framework/Versions/2.7/Reso …Run Code Online (Sandbox Code Playgroud) 我经常需要将函数应用于非常大的DataFrame(混合数据类型)组,并希望利用多个核心.
我可以从组中创建一个迭代器并使用多处理模块,但它效率不高,因为必须对每个组和函数的结果进行pickle以便在进程之间进行消息传递.
有没有办法避免酸洗甚至避免DataFrame完全复制?看起来多处理模块的共享存储器功能仅限于numpy数组.还有其他选择吗?
我试图重写一些csv读取代码,以便能够在Python 3.2.2中的多个核心上运行它.我尝试使用Pool多处理的对象,我从工作示例改编(并且已经为我的项目的另一部分工作).我遇到了一条错误消息,我发现很难解密和排除故障.
错误:
Traceback (most recent call last):
File "parser5_nodots_parallel.py", line 256, in <module>
MG,ppl = csv2graph(r)
File "parser5_nodots_parallel.py", line 245, in csv2graph
node_chunks)
File "/Library/Frameworks/Python.framework/Versions/3.2/lib/python3.2/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/3.2/lib/python3.2/multiprocessing/pool.py", line 552, in get
raise self._value
AttributeError: __exit__
Run Code Online (Sandbox Code Playgroud)
相关代码:
import csv
import time
import datetime
import re
from operator import itemgetter
from multiprocessing import Pool
import itertools
def chunks(l,n):
"""Divide a list of nodes `l` in `n` chunks"""
l_c = iter(l)
while …Run Code Online (Sandbox Code Playgroud) multiprocessing ×10
python ×10
numpy ×3
python-3.x ×2
blas ×1
linux ×1
pandas ×1
pool ×1
process ×1
shared ×1