使用多处理队列和线程在程序退出时出现“ EOF错误”

Del*_*gan 2 python queue multithreading multiprocessing python-3.x

我很难理解为什么这个简单的程序最后会引发一个错误EOFError

我正在使用与Queue()进行通讯,Thread()因此我想自动干净地终止atexit程序。

import threading
import multiprocessing
import atexit

class MyClass:

    def __init__(self):
        self.queue = None
        self.thread = None

    def start(self):
        self.queue = multiprocessing.Queue()
        self.thread = threading.Thread(target=self.queued_writer, daemon=True)
        self.thread.start()

        # Remove this: no error
        self.queue.put("message")

    def queued_writer(self):
        while 1:
            msg = self.queue.get()
            print("Message:", msg)
            if msg is None:
                break

    def stop(self):
        self.queue.put(None)
        self.thread.join()

instance = MyClass()

atexit.register(instance.stop)

# Put this before register: no error
instance.start()
Run Code Online (Sandbox Code Playgroud)

这引起了:

Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 21, in queued_writer
    msg = self.queue.get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
Run Code Online (Sandbox Code Playgroud)

此外,此代码段的行为也很奇怪:如果我删除该self.queue.put("message")行,则不会引发任何错误,并且线程会成功退出。同样,如果在instance.start()is之前调用,这似乎可以工作atexit.register()

有谁知道错误从哪里来?

编辑:我注意到使用SimpleQueue()似乎可以使错误消失。

Del*_*gan 5

问题来自多个atexit.register()呼叫之间的冲突。

该文档指出:

atexit以注册时的相反顺序运行这些功能;如果你注册AB以及C在解释终止时间,他们将在订单中运行CBA

[...]

假设通常将在较低级别的模块之前导入较低级别的模块,因此必须稍后对其进行清理。

通过首先导入multiprocessing然后调用atexit.register(my_stop),您可以期望您的stop函数在任何内部终止过程之前执行...但是情况并非如此,因为它atexit.register()可能是动态调用的。

在当前情况下,该multiprocessing库利用了一个_exit_function旨在彻底关闭内部线程和队列的函数。此功能atexit 在模块级别注册,但是仅Queue()初始化对象后才加载模块。

因此,MyClassstop函数的函数之前注册multiprocessing,因此在之后instance.stop调用。 _exit_function

在终止期间,将_exit_function关闭内部管道连接,因此,如果线程稍后尝试.get()使用已关闭的读取连接进行调用,EOFError则会引发an 。仅当Python没有时间daemon在末尾自动杀死线程时才会发生这种情况,也就是说,如果注册了“慢速”退出函数(例如time.sleep(0.1),在这种情况下为thread.join()),并在通常的关闭过程之后运行。由于某些原因,写入连接关闭会延迟,因此.put()不会立即引发错误。

至于为什么对代码段进行一些小的修改就可以使其起作用:SimpleQueue尚无此功能,Finalizer因此稍后会关闭内部管道。在调用Queue第一个之前,不会启动的内部线程,.put()因此将其删除意味着没有管道要关闭。也可以通过导入来强制注册multiprocessing.queues