Del*_*gan 2 python queue multithreading multiprocessing python-3.x
我很难理解为什么这个简单的程序最后会引发一个错误EOFError。
我正在使用与Queue()进行通讯,Thread()因此我想自动干净地终止atexit程序。
import threading
import multiprocessing
import atexit
class MyClass:
def __init__(self):
self.queue = None
self.thread = None
def start(self):
self.queue = multiprocessing.Queue()
self.thread = threading.Thread(target=self.queued_writer, daemon=True)
self.thread.start()
# Remove this: no error
self.queue.put("message")
def queued_writer(self):
while 1:
msg = self.queue.get()
print("Message:", msg)
if msg is None:
break
def stop(self):
self.queue.put(None)
self.thread.join()
instance = MyClass()
atexit.register(instance.stop)
# Put this before register: no error
instance.start()
Run Code Online (Sandbox Code Playgroud)
这引起了:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 21, in queued_writer
msg = self.queue.get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
Run Code Online (Sandbox Code Playgroud)
此外,此代码段的行为也很奇怪:如果我删除该self.queue.put("message")行,则不会引发任何错误,并且线程会成功退出。同样,如果在instance.start()is之前调用,这似乎可以工作atexit.register()。
有谁知道错误从哪里来?
编辑:我注意到使用SimpleQueue()似乎可以使错误消失。
问题来自多个atexit.register()呼叫之间的冲突。
该文档指出:
atexit以注册时的相反顺序运行这些功能;如果你注册A,B以及C在解释终止时间,他们将在订单中运行C,B,A。[...]
假设通常将在较低级别的模块之前导入较低级别的模块,因此必须稍后对其进行清理。
通过首先导入multiprocessing然后调用atexit.register(my_stop),您可以期望您的stop函数在任何内部终止过程之前执行...但是情况并非如此,因为它atexit.register()可能是动态调用的。
在当前情况下,该multiprocessing库利用了一个_exit_function旨在彻底关闭内部线程和队列的函数。此功能atexit 在模块级别注册,但是仅在Queue()初始化对象后才加载模块。
因此,MyClassstop函数在的函数之前注册multiprocessing,因此在之后instance.stop调用。 _exit_function
在终止期间,将_exit_function关闭内部管道连接,因此,如果线程稍后尝试.get()使用已关闭的读取连接进行调用,EOFError则会引发an 。仅当Python没有时间daemon在末尾自动杀死线程时才会发生这种情况,也就是说,如果注册了“慢速”退出函数(例如time.sleep(0.1),在这种情况下为thread.join()),并在通常的关闭过程之后运行。由于某些原因,写入连接关闭会延迟,因此.put()不会立即引发错误。
至于为什么对代码段进行一些小的修改就可以使其起作用:SimpleQueue尚无此功能,Finalizer因此稍后会关闭内部管道。在调用Queue第一个之前,不会启动的内部线程,.put()因此将其删除意味着没有管道要关闭。也可以通过导入来强制注册multiprocessing.queues。
| 归档时间: |
|
| 查看次数: |
1570 次 |
| 最近记录: |