Mat*_*teo 4 python join multiprocessing python-multiprocessing
我正在使用multiprocessingpython 库生成 4 个Process()对象来并行化 CPU 密集型任务。任务(来自这篇伟大文章的灵感和代码)是计算列表中每个整数的质因数。
主要.py:
import random
import multiprocessing
import sys
num_inputs = 4000
num_procs = 4
proc_inputs = num_inputs/num_procs
input_list = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with success
#p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))
procs.append(p)
p.start()
for p in jobs:
print "joining ", p, output_queue.qsize(), output_queue.full()
p.join()
print "joined ", p, output_queue.qsize(), output_queue.full()
print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)
Run Code Online (Sandbox Code Playgroud)
观察:
worker1,对于大于 4000 个元素的输入列表,主线程就会卡住.join(),等待产生的进程终止并且永不返回。worker2,对于相同的输入列表,代码工作得很好,主线程返回。这让我很困惑,因为worker1和worker2(见下文)之间的唯一区别是前者在其中Queue插入了单独的列表,而后者为每个进程插入了一个列表。
为什么使用worker1和不使用worker2目标会出现死锁?不应该两者(或都不应该)超出Multiprocessing Queue maxsize 限制是 32767吗?
工人 1 与工人 2:
def worker1(proc_num, proc_list, output_queue):
'''worker function which deadlocks'''
for num in proc_list:
output_queue.put(factorize_naive(num))
def worker2(proc_num, proc_list, output_queue):
'''worker function that works'''
workers_stuff = []
for num in proc_list:
workers_stuff.append(factorize_naive(num))
output_queue.put(workers_stuff)
Run Code Online (Sandbox Code Playgroud)
关于 SO有很多类似的问题,但我相信这些问题的核心显然与所有问题不同。
相关链接:
文档对此发出警告:
警告:如上所述,如果子进程将项目放入队列(并且它没有使用 JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都已刷新到管道中。
这意味着,如果您尝试加入该进程,您可能会遇到死锁,除非您确定放入队列的所有项目都已被消耗。类似地,如果子进程是非守护进程,那么当它尝试加入其所有非守护进程时,父进程可能会在退出时挂起。
虽然 aQueue似乎是无界的,但在幕后排队的项目被缓冲在内存中以避免进程间管道过载。在刷新这些内存缓冲区之前,进程无法正常结束。你worker1()放多了很多项目在排队比你的worker2(),而这一切就是这么简单。请注意,在实现诉诸内存缓冲之前可以排队的项目数量未定义:它可能因操作系统和 Python 版本而异。
正如文档所建议的那样,避免这种情况的正常方法是在您尝试处理之前将.get()所有项目从队列中删除。正如您所发现的,是否有必要这样做取决于每个工作进程已将多少项放入队列中,但未定义。.join()
| 归档时间: |
|
| 查看次数: |
3074 次 |
| 最近记录: |