python 多处理队列 put() 和 get() 的奇怪阻塞行为

evo*_*ion 5 python linux queue multiprocessing blocking

我在python 2.7(在linux下)编写了一个类,它使用多个进程异步操作数据库。我遇到当使用一个很奇怪的阻塞行为multiprocessing.Queue.put()multiprocessing.Queue.get()我无法解释。

这是我所做的简化版本:

from multiprocessing import Process, Queue

class MyDB(object):  

    def __init__(self):
        self.inqueue = Queue()

        p1 = Process(target = self._worker_process, kwargs={"inqueue": self.inqueue})
        p1.daemon = True
        started = False
        while not started:
            try:
                p1.start()
                started = True
            except:
                time.sleep(1)  

        #Sometimes I start a same second process but it makes no difference to my problem 
        p2 = Process(target = self._worker_process, kwargs={"inqueue": self.inqueue})
        #blahblah... (same as above)

    @staticmethod
    def _worker_process(inqueue):
        while True:
            #--------------this blocks depite data having arrived------------

            op = inqueue.get(block = True)
            #do something with specified operation

            #---------------problem area end--------------------
            print "if this text gets printed, the problem was solved"

    def delete_parallel(self, key, rawkey = False):
        someid = ...blahblah
        #--------------this section blocked when I was posting the question but for unknown reasons it's fine now
        self.inqueue.put({"optype": "delete", "kwargs": {"key":key, "rawkey":rawkey}, "callid": someid}, block = True) 
        #--------------problem area end----------------
        print "if you see this text, there was no blocking or block was released"
Run Code Online (Sandbox Code Playgroud)

如果我运行上面的代码测试中(在我称之为delete_parallel的上MyDB对象),然后一切正常,但如果我在我的整个应用程序的情况下运行(输入其他的东西,包容的PyGTK)奇怪的事情发生了:

出于某种原因self.inqueue.get,尽管self.inqueue其缓冲区中有数据,但仍会阻塞并且永远不会释放。当我改为调用时self.inqueue.get(block = False, timeout = 1),调用通过引发 Queue.Empty 完成,尽管队列包含数据。qsize()返回 1(表明有数据)而empty()返回 True(表明没有数据)。

现在很明显,我的应用程序中必须有其他地方self.inqueue通过导致获取某些内部信号量而无法使用。但是我不知道该找什么。一旦到达阻塞信号量,Eclipse 调试就变得无用了。

编辑8(清理并总结我之前的编辑)上次遇到类似问题,结果是pygtk劫持了全局解释器锁,但是我gobject.threads_init()在调用其他任何东西之前通过调用解决了它。这个问题可能相关吗?

当我print "successful reception"get()方法之后引入 a并在终端中执行我的应用程序时,首先会发生相同的行为。当我然后按 CTRL+DI 终止时,突然在消息之间得到字符串“成功接收”。在我看来,这就像其他一些进程/线程被终止并释放了阻止卡在get().

由于卡住的进程稍后终止,我仍然看到该消息。什么样的过程可以从外部弄乱像这样的队列?self.inqueue只能在我的班级内访问。

现在它似乎归结为这个队列,尽管数据在那里,但它不会返回任何东西:

缺陷队列

get()方法在尝试从某个内部管道接收实际数据时似乎卡住了。我的调试器挂起之前的最后一行是:

res = self._recv()
Run Code Online (Sandbox Code Playgroud)

这是在multiprocessing.queues.get() 跟踪这个内部 python 内容的内部,我找到了作业 self._recv = self._reader.recvself._reader, self._writer = Pipe(duplex=False).

编辑 9 我目前正试图寻找导致它的导入。我的应用程序非常复杂,有数百个类,每个类都导入了许多其他类,所以这是一个非常痛苦的过程。我找到了第一个候选类,MyDB当我跟踪它的所有导入时,它使用 3 个不同的实例(但MyDB.inqueue据我所知,它在任何时候都不能访问)。奇怪的是,它基本上只是一个包装器,而被包装的类在单独导入时工作得很好。这也意味着它可以在MyDB不冻结的情况下使用。一旦我导入包装器(它导入那个类),我就会遇到阻塞问题。

我开始通过逐渐重用旧代码来重写包装器。每次我引入几行新行时,我都会进行测试,直到我希望看到哪一行会导致问题再次出现。

Dun*_*nes 1

queue.Queue使用内部线程来维护其状态。如果您使用 GTK 那么它会破坏这些线程。所以你需要打电话gobject.init_threads()

应该注意的是,qsize()仅返回队列的大概大小。实际大小可以是 0 和 返回值之间的任意值qsize()