我试图了解池和队列如何在Python中工作,并且以下示例未按预期工作。我希望程序结束,但是由于第二个队列没有清空,它陷入了无限循环。
import multiprocessing
import os
import time
inq = multiprocessing.Queue()
outq = multiprocessing.Queue()
def worker_main(q1, q2):
while True:
i = q1.get(True)
time.sleep(.1)
q2.put(i*2)
def worker2(q):
print q.get(True)
p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
p2 = multiprocessing.Pool(2, worker2,(outq,))
for i in range(50):
inq.put(i)
while inq.qsize()>0 or outq.qsize()>0:
print 'q1 size', inq.qsize(), 'q2 size', outq.qsize()
time.sleep(.1)
Run Code Online (Sandbox Code Playgroud)
输出显示第二个队列(outq)为.get一次,仅此而已。
输出:
Run Code Online (Sandbox Code Playgroud)q1 size 49 q2 size 0 q1 size 47 q2 size 0 2 4 q1 size 44 q2 size 1 q1 size 41 q2 size 4 q1 size 38 q2 size 7 q1 size 35 q2 size 11 q1 size 31 q2 size 14 q1 size 27 q2 size 18 q1 size 24 q2 size 21 q1 size 22 q2 size 23 q1 size 19 q2 size 26 q1 size 15 q2 size 30 q1 size 12 q2 size
为什么在outq为空之前不给worker2打电话?
This is a very odd way to use a Pool
. The function passed to the constructor is called only once per process in the pool. It's intended for one-time initialization tasks, and is rarely used.
As is, your worker2
is called exactly twice, one time for each process in yout p2
pool. Your function gets one value from a queue and then exits. The process never does anything else. So it's doing exactly what you coded it to do.
There's no evident reason to use a Pool
here at all; creating 5 multiprocessing.Process
objects instead would be more natural.
If you feel you have to do it this way, then you need to put a loop in worker2
. Here's one way:
import multiprocessing
import time
def worker_main(q1, q2):
while True:
i = q1.get()
if i is None:
break
time.sleep(.1)
q2.put(i*2)
def worker2(q):
while True:
print(q.get())
if __name__ == "__main__":
inq = multiprocessing.Queue()
outq = multiprocessing.Queue()
p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
p2 = multiprocessing.Pool(2, worker2,(outq,))
for i in range(50):
inq.put(i)
for i in range(3): # tell worker_main we're done
inq.put(None)
while inq.qsize()>0 or outq.qsize()>0:
print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
time.sleep(.1)
Run Code Online (Sandbox Code Playgroud)
This is a "more natural" way to use Process
objects instead, and using queue sentinels (special values - here None
) to let processes know when to stop. BTW, I'm using Python 3, so use print
as a function rather than as a statement.
import multiprocessing as mp
import time
def worker_main(q1, q2):
while True:
i = q1.get()
if i is None:
break
time.sleep(.1)
q2.put(i*2)
def worker2(q):
while True:
i = q.get()
if i is None:
break
print(i)
def wait(procs):
alive_count = len(procs)
while alive_count:
alive_count = 0
for p in procs:
if p.is_alive():
p.join(timeout=0.1)
print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
alive_count += 1
if __name__ == "__main__":
inq = mp.Queue()
outq = mp.Queue()
p1s = [mp.Process(target=worker_main, args=(inq, outq,))
for i in range(3)]
p2s = [mp.Process(target=worker2, args=(outq,))
for i in range(2)]
for p in p1s + p2s:
p.start()
for i in range(50):
inq.put(i)
for p in p1s: # tell worker_main we're done
inq.put(None)
wait(p1s)
# Tell worker2 we're done
for p in p2s:
outq.put(None)
wait(p2s)
Run Code Online (Sandbox Code Playgroud)