具有大量数据的 Multiprocessing.Queue 会导致 _wait_for_tstate_lock

buh*_*htz 9 python queue python-multithreading python-3.x python-multiprocessing

当我在 a和viathreading._wait_for_tstate_lock之间传输大量数据时,会引发异常。ProcessThreadmultiprocessing.Queue

我的最小工作示例首先看起来有点复杂 - 抱歉。我会解释。原始应用程序将大量(不那么重要)文件加载到 RAM 中。这是在单独的进程中完成的,以节省资源。主 GUI 线程不应冻结。

  1. GUI 启动一个单独的进程Thread以防止 gui 事件循环冻结。

  2. Thread然后,这个单独的任务将启动一个Process应该完成工作的任务。

a) 这Thread实例化了 a multiprocess.Queue(注意这是 amultiprocessing而不是threading!)

b) 这是为了从回Process共享数据。ProcessThread

  1. Process了一些工作(3 个步骤)并将.put()结果放入multiprocessing.Queue.

  2. Process结束时Thread,再次接管并收集来自的数据Queue,将其存储到自己的属性中MyThread.result

  3. 告诉ThreadGUI 主循环/线程在有时间的情况下调用回调函数。

  4. 回调函数 ( MyWindow::callback_thread_finished()) 从 中获取结果MyWindow.thread.result

问题是,如果放入的数据Queue发生了很大的事情,我不明白 -MyThread永远不会结束。我必须通过 Strg+C 取消该应用程序。

我从文档中得到了一些提示。但我的问题是我没有完全理解文档。但我有一种感觉,我的问题的关键就在那里。请参阅“管道和队列”(Python 3.5 文档)中的两个红色方框。这就是完整的输出

MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)

这是最小的工作示例

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib


class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()
Run Code Online (Sandbox Code Playgroud)

可能重复但完全不同,IMO 没有回答我的情况:Thread._wait_for_tstate_lock() never returns

解决方法

使用Manager通过修改第22行来queue = multiprocessing.Manager().Queue()解决问题。但我不知道为什么。我问这个问题的目的是为了了解背后的事情,而不仅仅是让我的代码工作。即使我真的不知道 aManager()是什么以及它是否有其他(引起问题的)影响。

Bla*_*ack 6

根据您链接到的文档中的第二个警告框,当您在处理队列中的所有项目之前加入进程时,可能会出现死锁。因此,启动进程并立即加入它,然后处理队列中的项目是错误的步骤顺序。您必须启动该流程,然后接收项目,然后只有当收到所有项目时,您才能调用 join 方法。定义一些哨兵值来表示进程已完成通过队列发送数据。None例如,如果这不是您期望从流程中获得的常规值。

class MyThread(threading.Thread):
    """This thread just starts the process."""

    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback
        self.result = []

    def run(self):
        print('Running MyThread...')
        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        while True:
            process_result = queue.get()
            if process_result is None:
                break
            self.result.append(process_result)
        process.join()
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x' * 102048))
        self.queue.put(None)
        print('MyProcess stoppd.')
Run Code Online (Sandbox Code Playgroud)

  • 不,它不会浪费资源,因为“queue.get()”会阻塞,直到队列中确实有东西为止。这不是一个繁忙的循环。它对队列中的每个项目只运行一次。 (2认同)