vor*_*192 7 python queue priority-queue multiprocessing
正如您从标题中所知,我正在尝试使用PriorityQueue进行多处理.更确切地说,我想制作共享的PriorityQueue,编写了一些代码并且它没有像我预期的那样运行.
看看代码:
import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue
def worker(queue):
    lock = Lock()
    with lock:
        for i in range(100):
            queue.put(i)
    print "worker", queue.qsize()
pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
得到以下输出:
worker 100
main 0
发生了什么以及如何以正确的方式做我想做的事情?谢谢.
dan*_*ano 11
问题不在于它在这种情况下是不可选择的 - 如果你使用的是类Unix平台,队列可以传递给孩子而不进行酸洗.(在Windows上,我认为你会在这里遇到酸洗错误).根本问题是您没有使用进程安全队列.即可以在进程间所使用的唯一队列的Queue对象住内部multiprocessing模块.不幸的是,没有PriorityQueue可用的实现.但是,您可以通过PriorityQueue在multiprocessing.Manager类中注册a来轻松创建一个,如下所示:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue
class MyManager(SyncManager):
    pass
MyManager.register("PriorityQueue", PriorityQueue)  # Register a shared PriorityQueue
def Manager():
    m = MyManager()
    m.start()
    return m
def worker(queue):
    print(queue)
    for i in range(100):
        queue.put(i)
    print "worker", queue.qsize()
m = Manager()
pr_queue = m.PriorityQueue()  # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
输出:
worker 100
main 100
请注意,这可能不会像它是标准multiprocessing.Queue子类那样好.该Manager基PriorityQueue是通过创建一个实现Manager它实际上包含了常规的服务器进程PriorityQueue,然后提供你的主要和工作进程与Proxy使用IPC阅读对象/写在服务器进程的队列.常规multiprocessing.Queues只是写入/读取数据Pipe.如果这是一个问题,您可以尝试multiprocessing.PriorityQueue通过子类化或委托来实现自己的multiprocessing.Queue.但是,这可能不值得.