两个python池,两个队列

use*_*040 1 python queue pool

我试图了解池和队列如何在Python中工作,并且以下示例未按预期工作。我希望程序结束,但是由于第二个队列没有清空,它陷入了无限循环。

import multiprocessing
import os
import time

inq = multiprocessing.Queue()
outq = multiprocessing.Queue()

def worker_main(q1, q2):
    while True:
        i = q1.get(True)
        time.sleep(.1) 
        q2.put(i*2)

def worker2(q):
    print q.get(True)

p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
p2 = multiprocessing.Pool(2, worker2,(outq,))


for i in range(50):
    inq.put(i)


while inq.qsize()>0 or outq.qsize()>0:
    print 'q1 size', inq.qsize(), 'q2 size', outq.qsize()
    time.sleep(.1)
Run Code Online (Sandbox Code Playgroud)

输出显示第二个队列(outq)为.get一次,仅此而已。

输出:

q1 size 49 q2 size 0
q1 size 47 q2 size 0
2
4
q1 size 44 q2 size 1
q1 size 41 q2 size 4
q1 size 38 q2 size 7
q1 size 35 q2 size 11
q1 size 31 q2 size 14
q1 size 27 q2 size 18
q1 size 24 q2 size 21
q1 size 22 q2 size 23
q1 size 19 q2 size 26
q1 size 15 q2 size 30
q1 size 12 q2 size
Run Code Online (Sandbox Code Playgroud)

为什么在outq为空之前不给worker2打电话?

Tim*_*ers 5

This is a very odd way to use a Pool. The function passed to the constructor is called only once per process in the pool. It's intended for one-time initialization tasks, and is rarely used.

As is, your worker2 is called exactly twice, one time for each process in yout p2 pool. Your function gets one value from a queue and then exits. The process never does anything else. So it's doing exactly what you coded it to do.

There's no evident reason to use a Pool here at all; creating 5 multiprocessing.Process objects instead would be more natural.

If you feel you have to do it this way, then you need to put a loop in worker2. Here's one way:

import multiprocessing
import time

def worker_main(q1, q2):
    while True:
        i = q1.get()
        if i is None:
            break
        time.sleep(.1) 
        q2.put(i*2)

def worker2(q):
    while True:
        print(q.get())

if __name__ == "__main__":
    inq = multiprocessing.Queue()
    outq = multiprocessing.Queue()
    p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
    p2 = multiprocessing.Pool(2, worker2,(outq,))

    for i in range(50):
        inq.put(i)
    for i in range(3): # tell worker_main we're done
        inq.put(None)

    while inq.qsize()>0 or outq.qsize()>0:
        print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
        time.sleep(.1)
Run Code Online (Sandbox Code Playgroud)

SUGGESTED

This is a "more natural" way to use Process objects instead, and using queue sentinels (special values - here None) to let processes know when to stop. BTW, I'm using Python 3, so use print as a function rather than as a statement.

import multiprocessing as mp
import time

def worker_main(q1, q2):
    while True:
        i = q1.get()
        if i is None:
            break
        time.sleep(.1) 
        q2.put(i*2)

def worker2(q):
    while True:
        i = q.get()
        if i is None:
            break
        print(i)

def wait(procs):
    alive_count = len(procs)
    while alive_count:
        alive_count = 0
        for p in procs:
            if p.is_alive():
                p.join(timeout=0.1)
                print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
                alive_count += 1

if __name__ == "__main__":
    inq = mp.Queue()
    outq = mp.Queue()
    p1s = [mp.Process(target=worker_main, args=(inq, outq,))
           for i in range(3)]
    p2s = [mp.Process(target=worker2, args=(outq,))
           for i in range(2)]
    for p in p1s + p2s:
        p.start()

    for i in range(50):
        inq.put(i)
    for p in p1s: # tell worker_main we're done
        inq.put(None)

    wait(p1s)
    # Tell worker2 we're done
    for p in p2s:
        outq.put(None)
    wait(p2s)
Run Code Online (Sandbox Code Playgroud)