jwi*_*720 3 python multiprocessing python-2.7
我意识到我可以使用 Pool 类并可能得到我需要的东西,但我想要更好地控制我的问题。我的工作比处理器多,所以我不希望它们同时运行。
例如:
from multiprocessing import Process,cpu_count
for dir_name in directories:
src_dir = os.path.join(top_level,dir_name)
dst_dir = src_dir.replace(args.src_dir,args.target_dir)
p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
p.start()
Run Code Online (Sandbox Code Playgroud)
但是,如果我有超过 16 个目录,那么我将启动比处理器更多的作业。这是我的解决方案,它真的很黑客。
from multiprocessing import Process,cpu_count
jobs = []
for dir_name in directories:
src_dir = os.path.join(top_level,dir_name)
dst_dir = src_dir.replace(args.src_dir,args.target_dir)
p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
jobs.append(p)
alive_jobs = []
while jobs:
if len(alive_jobs) >= cpu_count():
time.sleep(5)
print alive_jobs
for aj in alive_jobs:
if aj.is_alive():
continue
else:
print "job {} removed".format(aj)
alive_jobs.remove(aj)
continue
for job in jobs:
if job.is_alive():
continue
job.start()
alive_jobs.append(job)
print alive_jobs
jobs.remove(job)
if len(alive_jobs) >= cpu_count():
break
Run Code Online (Sandbox Code Playgroud)
使用内置工具有更好的解决方案吗?
我想在这里分享我的想法:创建等于 cpu_count() 的进程数,使用队列存储您的所有目录,并将队列传递到您的transfer_directory方法中,dir_name一旦进程完成其工作,就从队列中取出。草稿如下所示:
NUM_OF_PROCESSES = multiprocessing.cpu_count()
TIME_OUT_IN_SECONDS = 60
for dir_name in directories:
my_queue.put(dir_name)
# creates processes that equals to number of CPU
processes = [multiprocessing.Process(target=transfer_directory, args=(my_queue,)) for x in range(NUM_OF_PROCESSES)]
# starts processes
for p in processes:
p.start()
# blocks the calling thread
for p in processes:
p.join()
def transfer_directory(my_queue):
"""processes element of directory queue if queue is not empty"""
while my_queue is not empty:
dir_name = my_queue.get(timeout=TIME_OUT_IN_SECONDS)
src_dir = os.path.join(top_level,dir_name)
dst_dir = src_dir.replace(args.src_dir,args.target_dir)
Run Code Online (Sandbox Code Playgroud)
编辑:
它对于读取大文件也很有效。
我挣扎如何读取一个巨大的文件(这是超过1000万线),使用multiprocessing了一段时间,最后我用一个单一的过程开始producer(a_queue),只是读取和放线到队列中,然后启动多个consumers(a_queue)从拿线a_queue和做耗时的工作,它对我来说很正常。