sta*_*yra 10 python performance message-queue shared-memory multiprocessing
我在最近的老式Apple MacBook Pro上使用Python 2.7.5,它有四个硬件和八个逻辑CPU; 即,sysctl实用程序提供:
$ sysctl hw.physicalcpu
hw.physicalcpu: 4
$ sysctl hw.logicalcpu
hw.logicalcpu: 8
Run Code Online (Sandbox Code Playgroud)
我需要在大型1-D列表或数组上执行一些相当复杂的处理,然后将结果保存为中间输出,稍后将在我的应用程序的后续计算中再次使用.我的问题的结构很自然地适用于并行化,所以我认为我会尝试使用Python的多处理模块将1D阵列细分为几个部分(4件或8件,我还不确定哪个),执行并行计算,然后将结果输出重新组合成最终格式.我正在尝试决定是使用multiprocessing.Queue()
(消息队列)还是multiprocessing.Array()
(共享内存)作为我的首选机制,用于将子进程的结果计算传递回主父进程,我一直在尝试使用几个"玩具"模型,以确保我理解多处理模块实际上是如何工作的.然而,我遇到了一个相当意外的结果:在为同一个问题创建两个本质上等效的解决方案时,使用共享内存进行进程间通信的版本似乎比使用消息的版本需要更多的执行时间(比多30倍!)队列.下面,我为"玩具"问题提供了两个不同版本的示例源代码,它使用并行进程生成一长串随机数,并以两种不同的方式将聚集结果传回父进程:首先使用消息队列,第二次使用共享内存.
以下是使用消息队列的版本:
import random
import multiprocessing
import datetime
def genRandom(count, id, q):
print("Now starting process {0}".format(id))
output = []
# Generate a list of random numbers, of length "count"
for i in xrange(count):
output.append(random.random())
# Write the output to a queue, to be read by the calling process
q.put(output)
if __name__ == "__main__":
# Number of random numbers to be generated by each process
size = 1000000
# Number of processes to create -- the total size of all of the random
# numbers generated will ultimately be (procs * size)
procs = 4
# Create a list of jobs and queues
jobs = []
outqs = []
for i in xrange(0, procs):
q = multiprocessing.Queue()
p = multiprocessing.Process(target=genRandom, args=(size, i, q))
jobs.append(p)
outqs.append(q)
# Start time of the parallel processing and communications section
tstart = datetime.datetime.now()
# Start the processes (i.e. calculate the random number lists)
for j in jobs:
j.start()
# Read out the data from the queues
data = []
for q in outqs:
data.extend(q.get())
# Ensure all of the processes have finished
for j in jobs:
j.join()
# End time of the parallel processing and communications section
tstop = datetime.datetime.now()
tdelta = datetime.timedelta.total_seconds(tstop - tstart)
msg = "{0} random numbers generated in {1} seconds"
print(msg.format(len(data), tdelta))
Run Code Online (Sandbox Code Playgroud)
当我运行它时,我得到一个通常看起来像这样的结果:
$ python multiproc_queue.py
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 0.514805 seconds
Run Code Online (Sandbox Code Playgroud)
现在,这是等效的代码段,但稍微重构,以便它使用共享内存而不是队列:
import random
import multiprocessing
import datetime
def genRandom(count, id, d):
print("Now starting process {0}".format(id))
# Generate a list of random numbers, of length "count", and write them
# directly to a segment of an array in shared memory
for i in xrange(count*id, count*(id+1)):
d[i] = random.random()
if __name__ == "__main__":
# Number of random numbers to be generated by each process
size = 1000000
# Number of processes to create -- the total size of all of the random
# numbers generated will ultimately be (procs * size)
procs = 4
# Create a list of jobs and a block of shared memory
jobs = []
data = multiprocessing.Array('d', size*procs)
for i in xrange(0, procs):
p = multiprocessing.Process(target=genRandom, args=(size, i, data))
jobs.append(p)
# Start time of the parallel processing and communications section
tstart = datetime.datetime.now()
# Start the processes (i.e. calculate the random number lists)
for j in jobs:
j.start()
# Ensure all of the processes have finished
for j in jobs:
j.join()
# End time of the parallel processing and communications section
tstop = datetime.datetime.now()
tdelta = datetime.timedelta.total_seconds(tstop - tstart)
msg = "{0} random numbers generated in {1} seconds"
print(msg.format(len(data), tdelta))
Run Code Online (Sandbox Code Playgroud)
但是,当我运行共享内存版本时,典型结果看起来更像是:
$ python multiproc_shmem.py
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 15.839607 seconds
Run Code Online (Sandbox Code Playgroud)
我的问题:为什么我的代码的两个版本之间的执行速度(大约0.5秒对15秒,因子为30倍!)有如此巨大的差异?特别是,如何修改共享内存版本以使其运行得更快?
dan*_*ano 18
这是因为multiprocessing.Array
默认情况下使用锁来防止多个进程一次访问它:
multiprocessing.Array(typecode_or_type,size_or_initializer,*,lock = True)
...
如果lock为True(默认值),则会创建一个新的锁定对象以同步对该值的访问.如果lock是Lock或RLock对象,那么将使用该对象同步访问该值.如果lock为False,则对锁定不会自动保护对返回对象的访问,因此它不一定是"进程安全的".
这意味着您并不是真正同时写入数组 - 一次只能有一个进程访问它.由于您的示例工作者几乎不做任何数组写操作,因此不断等待此锁会严重损害性能.如果lock=False
在创建阵列时使用,性能会更好:
用lock=True
:
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 4.811205 seconds
Run Code Online (Sandbox Code Playgroud)
用lock=False
:
Now starting process 0
Now starting process 3
Now starting process 1
Now starting process 2
4000000 random numbers generated in 0.192473 seconds
Run Code Online (Sandbox Code Playgroud)
请注意,使用lock=False
意味着您需要手动保护对Array
任何非过程安全的操作的访问.你的例子是让进程写入唯一的部分,所以没关系.但是,如果您在执行此操作时尝试从中读取,或者有不同的进程写入重叠部分,则需要手动获取锁定.