我试图拆分循环即
N = 1000000
for i in xrange(N):
#do something
Run Code Online (Sandbox Code Playgroud)
使用multiprocessing.Process并且它适用于N的小值.当我使用更大的N值时会出现问题.在p.join()之前或期间发生奇怪的事情并且程序没有响应.如果我把print i,而不是q.put(i)放在函数f的定义中,一切都运行良好.
我将不胜感激任何帮助.这是代码.
from multiprocessing import Process, Queue
def f(q,nMin, nMax): # function for multiprocessing
for i in xrange(nMin,nMax):
q.put(i)
if __name__ == '__main__':
nEntries = 1000000
nCpu = 10
nEventsPerCpu = nEntries/nCpu
processes = []
q = Queue()
for i in xrange(nCpu):
processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )
for p in processes:
p.start()
for p in processes:
p.join()
print q.qsize()
Run Code Online (Sandbox Code Playgroud) 在 Python 2.7 中,我实现了一个具有多个队列和消费者的多处理场景。简化的想法是,我有一个作业的生产者,这些作业被提供给消费者,处理作业和一个错误处理程序,它负责所有的日志记录。非常简化,看起来都可以与之媲美:
import multiprocessing as mp
import Queue
job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
job_queue.put(i)
def job_handler(job_queue, error_queue):
print 'Job handler'
while True:
try:
element = job_queue.get_nowait()
print element
except:
# t1
error_queue.put('Error')
error_queue.close()
error_queue.join_thread()
job_queue.close()
job_queue.join_thread()
# t2
return 1
def error_handler(error_queue):
result = error_queue.get()
if result == 'Error':
error_queue.close()
error_queue.join_thread()
if __name__ == '__main__':
print 'Starting'
p1 = mp.Process(target = error_handler, args = (error_queue, ))
p1.start()
p2 = mp.Process(target = job_handler, args …Run Code Online (Sandbox Code Playgroud) 我正在使用multiprocessingpython 库生成 4 个Process()对象来并行化 CPU 密集型任务。任务(来自这篇伟大文章的灵感和代码)是计算列表中每个整数的质因数。
主要.py:
import random
import multiprocessing
import sys
num_inputs = 4000
num_procs = 4
proc_inputs = num_inputs/num_procs
input_list = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with …Run Code Online (Sandbox Code Playgroud)