gal*_*pah 7 python generator multiprocessing embarrassingly-parallel
我想对从文件加载的一些输入数据运行并行计算.(文件可能非常大,所以我使用了一个生成器.)
在一定数量的项目上,我的代码运行正常但超过此阈值程序挂起(某些工作进程不会结束).
有什么建议?(我用python2.7运行它,8个CPU; 5000行仍然可以,7,500不起作用.)
首先,您需要一个输入文件.在bash中生成它:
for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done
Run Code Online (Sandbox Code Playgroud)
然后,运行这个:
python2.7 main.py 100 counter.txt > run_log.txt
Run Code Online (Sandbox Code Playgroud)
main.py:
#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp
def eat_queue(job_queue, result_queue):
"""Eats input queue, feeds output queue
"""
proc_name = mp.current_process().name
while True:
try:
job = job_queue.get(block=False)
if job == None:
print(proc_name + " DONE")
return
result_queue.put(execute(job))
except Queue.Empty:
pass
def execute(x):
"""Does the computation on the input data
"""
return x*x
def save_result(result):
"""Saves results in a list
"""
result_list.append(result)
def load(ifilename):
"""Generator reading the input file and
yielding it row by row
"""
ifile = open(ifilename, "r")
for line in ifile:
line = line.strip()
num = int(line)
yield (num)
ifile.close()
print("file closed".upper())
def put_tasks(job_queue, ifilename):
"""Feeds the job queue
"""
for item in load(ifilename):
job_queue.put(item)
for _ in range(get_max_workers()):
job_queue.put(None)
def get_max_workers():
"""Returns optimal number of processes to run
"""
max_workers = mp.cpu_count() - 2
if max_workers < 1:
return 1
return max_workers
def run(workers_num, ifilename):
job_queue = mp.Queue()
result_queue = mp.Queue()
# decide how many processes are to be created
max_workers = get_max_workers()
print "processes available: %d" % max_workers
if workers_num < 1 or workers_num > max_workers:
workers_num = max_workers
workers_list = []
# a process for feeding job queue with the input file
task_gen = mp.Process(target=put_tasks, name="task_gen",
args=(job_queue, ifilename))
workers_list.append(task_gen)
for i in range(workers_num):
tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
args=(job_queue, result_queue))
workers_list.append(tmp)
for worker in workers_list:
worker.start()
for worker in workers_list:
worker.join()
print "worker %s finished!" % worker.name
if __name__ == '__main__':
result_list = []
args = sys.argv
workers_num = int(args[1])
ifilename = args[2]
run(workers_num, ifilename)
Run Code Online (Sandbox Code Playgroud)
这是因为代码中没有任何内容可以取消 任何内容result_queue.然后行为取决于内部队列缓冲细节:如果"不是很多"数据正在等待,一切看起来都很好,但如果"大量"数据正在等待,一切都会冻结.没有更多可以说,因为它涉及内部魔法层;-)但文档确实警告它:
警告
如上所述,如果子进程已将项目放在队列中(并且它未使用JoinableQueue.cancel_join_thread),则在将所有缓冲的项目都刷新到管道之前,该进程不会终止.
这意味着,如果您尝试加入该进程,则可能会遇到死锁,除非您确定已经使用了已放入队列的所有项目.类似地,如果子进程是非守护进程,则父进程在尝试加入其所有非守护进程子进程时可能会在退出时挂起.
请注意,使用管理器创建的队列没有此问题.请参阅编程指南.
一种简单的修复方法:首先添加
result_queue.put(None)
Run Code Online (Sandbox Code Playgroud)
在eat_queue()返回之前.然后加:
count = 0
while count < workers_num:
if result_queue.get() is None:
count += 1
Run Code Online (Sandbox Code Playgroud)
在主程序之前.join()的工人.排出结果队列,然后一切都干净利落.
顺便说一句,这段代码很奇怪:
while True:
try:
job = job_queue.get(block=False)
if job == None:
print(proc_name + " DONE")
return
result_queue.put(execute(job))
except Queue.Empty:
pass
Run Code Online (Sandbox Code Playgroud)
你为什么要做非阻塞get()?只要队列为空,这就会变成CPU占用"忙循环".主要观点.get()是提供一种等待工作出现的有效方法.所以:
while True:
job = job_queue.get()
if job is None:
print(proc_name + " DONE")
break
else:
result_queue.put(execute(job))
result_queue.put(None)
Run Code Online (Sandbox Code Playgroud)
做同样的事情,但效率更高.
队列大小谨慎
你没有问过这个问题,但是在它咬你之前让它覆盖它;-)默认情况下,它Queue的大小没有限制.例如,如果您添加了十亿个项目Queue,它将需要足够的RAM来容纳十亿个项目.因此,如果您的生产者可以比您的消费者处理它们更快地生成工作项,那么内存使用可能会很快失控.
幸运的是,这很容易修复:指定最大队列大小.例如,
job_queue = mp.Queue(maxsize=10*workers_num)
^^^^^^^^^^^^^^^^^^^^^^^
Run Code Online (Sandbox Code Playgroud)
然后job_queue.put(some_work_item)将阻塞,直到消费者将队列的大小减小到小于最大值.这样,您可以处理需要简单RAM的队列的大量问题.
| 归档时间: |
|
| 查看次数: |
1866 次 |
| 最近记录: |