Cha*_*ois 2 python locking atexit python-multithreading python-3.x
我正在玩 python 3.7.4 上的线程,我想用它atexit来注册一个将(干净地)终止线程的清理函数。
例如:
# example.py
import threading
import queue
import atexit
import sys
Terminate = object()
class Worker(threading.Thread):
def __init__(self):
super().__init__()
self.queue = queue.Queue()
def send_message(self, m):
self.queue.put_nowait(m)
def run(self):
while True:
m = self.queue.get()
if m is Terminate:
break
else:
print("Received message: ", m)
def shutdown_threads(threads):
for t in threads:
print(f"Terminating thread {t}")
t.send_message(Terminate)
for t in threads:
print(f"Joining on thread {t}")
t.join()
else:
print("All threads terminated")
if __name__ == "__main__":
threads = [
Worker()
for _ in range(5)
]
atexit.register(shutdown_threads, threads)
for t in threads:
t.start()
for t in threads:
t.send_message("Hello")
#t.send_message(Terminate)
sys.exit(0)
Run Code Online (Sandbox Code Playgroud)
但是,似乎与atexit回调中的线程和队列交互会与一些内部关闭例程产生死锁:
# example.py
import threading
import queue
import atexit
import sys
Terminate = object()
class Worker(threading.Thread):
def __init__(self):
super().__init__()
self.queue = queue.Queue()
def send_message(self, m):
self.queue.put_nowait(m)
def run(self):
while True:
m = self.queue.get()
if m is Terminate:
break
else:
print("Received message: ", m)
def shutdown_threads(threads):
for t in threads:
print(f"Terminating thread {t}")
t.send_message(Terminate)
for t in threads:
print(f"Joining on thread {t}")
t.join()
else:
print("All threads terminated")
if __name__ == "__main__":
threads = [
Worker()
for _ in range(5)
]
atexit.register(shutdown_threads, threads)
for t in threads:
t.start()
for t in threads:
t.send_message("Hello")
#t.send_message(Terminate)
sys.exit(0)
Run Code Online (Sandbox Code Playgroud)
(这KeyboardInterrupt是我使用的,ctrl-c因为该过程似乎无限期地挂起)。
但是,如果我Terminate在退出之前发送消息(取消注释之后的行t.send_message("Hello")),程序不会挂起并正常终止:
$ python example.py
Received message: Hello
Received message: Hello
Received message: Hello
Received message: Hello
Received message: Hello
^CException ignored in: <module 'threading' from '/usr/lib64/python3.7/threading.py'>
Traceback (most recent call last):
File "/usr/lib64/python3.7/threading.py", line 1308, in _shutdown
lock.acquire()
KeyboardInterrupt
Terminating thread <Worker(Thread-1, started 140612492904192)>
Terminating thread <Worker(Thread-2, started 140612484511488)>
Terminating thread <Worker(Thread-3, started 140612476118784)>
Terminating thread <Worker(Thread-4, started 140612263212800)>
Terminating thread <Worker(Thread-5, started 140612254820096)>
Joining on thread <Worker(Thread-1, stopped 140612492904192)>
Joining on thread <Worker(Thread-2, stopped 140612484511488)>
Joining on thread <Worker(Thread-3, stopped 140612476118784)>
Joining on thread <Worker(Thread-4, stopped 140612263212800)>
Joining on thread <Worker(Thread-5, stopped 140612254820096)>
All threads terminated
Run Code Online (Sandbox Code Playgroud)
这就引出了一个问题threading._shutdown,相对于atexit处理程序,该例程何时执行?与atexit处理程序中的线程交互是否有意义?
您可以使用一个守护线程来要求您的非守护线程优雅地清理。例如,这是必要的,如果您使用的是启动非守护进程线程的第三方库,则必须更改该库或执行以下操作:
import threading
def monitor_thread():
main_thread = threading.main_thread()
main_thread.join()
send_signal_to_non_daemon_thread_to_gracefully_shutdown()
monitor = threading.Thread(target=monitor_thread)
monitor.daemon = True
monitor.start()
start_non_daemon_thread()
Run Code Online (Sandbox Code Playgroud)
把它放在原始海报代码的上下文中(注意我们不需要 atexit 函数,因为在所有非守护线程停止之前不会调用它):
if __name__ == "__main__":
threads = [
Worker()
for _ in range(5)
]
for t in threads:
t.start()
for t in threads:
t.send_message("Hello")
#t.send_message(Terminate)
def monitor_thread():
main_thread = threading.main_thread()
main_thread.join()
shutdown_threads(threads)
monitor = threading.Thread(target=monitor_thread)
monitor.daemon = True
monitor.start()
Run Code Online (Sandbox Code Playgroud)