使用 atexit 终止线程时脚本卡在退出时

Cha*_*ois 2 python locking atexit python-multithreading python-3.x

我正在玩 python 3.7.4 上的线程,我想用它atexit来注册一个将(干净地)终止线程的清理函数。

例如:

# example.py
import threading
import queue
import atexit
import sys

Terminate = object()

class Worker(threading.Thread):
    def __init__(self):
        super().__init__()
        self.queue = queue.Queue()

    def send_message(self, m):
        self.queue.put_nowait(m)

    def run(self):
        while True:
            m = self.queue.get()
            if m is Terminate:
                break
            else:
                print("Received message: ", m)


def shutdown_threads(threads):
    for t in threads:
        print(f"Terminating thread {t}")
        t.send_message(Terminate)
    for t in threads:
        print(f"Joining on thread {t}")
        t.join()
    else:
        print("All threads terminated")

if __name__ == "__main__":
    threads = [
        Worker()
        for _ in range(5)
    ]
    atexit.register(shutdown_threads, threads)

    for t in threads:
        t.start()

    for t in threads:
        t.send_message("Hello")
        #t.send_message(Terminate)

    sys.exit(0)
Run Code Online (Sandbox Code Playgroud)

但是,似乎与atexit回调中的线程和队列交互会与一些内部关闭例程产生死锁:

# example.py
import threading
import queue
import atexit
import sys

Terminate = object()

class Worker(threading.Thread):
    def __init__(self):
        super().__init__()
        self.queue = queue.Queue()

    def send_message(self, m):
        self.queue.put_nowait(m)

    def run(self):
        while True:
            m = self.queue.get()
            if m is Terminate:
                break
            else:
                print("Received message: ", m)


def shutdown_threads(threads):
    for t in threads:
        print(f"Terminating thread {t}")
        t.send_message(Terminate)
    for t in threads:
        print(f"Joining on thread {t}")
        t.join()
    else:
        print("All threads terminated")

if __name__ == "__main__":
    threads = [
        Worker()
        for _ in range(5)
    ]
    atexit.register(shutdown_threads, threads)

    for t in threads:
        t.start()

    for t in threads:
        t.send_message("Hello")
        #t.send_message(Terminate)

    sys.exit(0)
Run Code Online (Sandbox Code Playgroud)

(这KeyboardInterrupt是我使用的,ctrl-c因为该过程似乎无限期地挂起)。

但是,如果我Terminate在退出之前发送消息(取消注释之后的行t.send_message("Hello")),程序不会挂起并正常终止:

$ python example.py
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
^CException ignored in: <module 'threading' from '/usr/lib64/python3.7/threading.py'>
Traceback (most recent call last):
  File "/usr/lib64/python3.7/threading.py", line 1308, in _shutdown
    lock.acquire()
KeyboardInterrupt
Terminating thread <Worker(Thread-1, started 140612492904192)>
Terminating thread <Worker(Thread-2, started 140612484511488)>
Terminating thread <Worker(Thread-3, started 140612476118784)>
Terminating thread <Worker(Thread-4, started 140612263212800)>
Terminating thread <Worker(Thread-5, started 140612254820096)>
Joining on thread <Worker(Thread-1, stopped 140612492904192)>
Joining on thread <Worker(Thread-2, stopped 140612484511488)>
Joining on thread <Worker(Thread-3, stopped 140612476118784)>
Joining on thread <Worker(Thread-4, stopped 140612263212800)>
Joining on thread <Worker(Thread-5, stopped 140612254820096)>
All threads terminated
Run Code Online (Sandbox Code Playgroud)

这就引出了一个问题threading._shutdown,相对于atexit处理程序,该例程何时执行?与atexit处理程序中的线程交互是否有意义?

gar*_*on4 5

您可以使用一个守护线程来要求您的非守护线程优雅地清理。例如,这是必要的,如果您使用的是启动非守护进程线程的第三方库,则必须更改该库或执行以下操作:

import threading

def monitor_thread():
    main_thread = threading.main_thread()
    main_thread.join()
    send_signal_to_non_daemon_thread_to_gracefully_shutdown()


monitor = threading.Thread(target=monitor_thread)
monitor.daemon = True
monitor.start()

start_non_daemon_thread()
Run Code Online (Sandbox Code Playgroud)

把它放在原始海报代码的上下文中(注意我们不需要 atexit 函数,因为在所有非守护线程停止之前不会调用它):

if __name__ == "__main__":
    threads = [
        Worker()
        for _ in range(5)
    ]
    
    for t in threads:
        t.start()

    for t in threads:
        t.send_message("Hello")
        #t.send_message(Terminate)

    def monitor_thread():
        main_thread = threading.main_thread()
        main_thread.join()
        shutdown_threads(threads)

    monitor = threading.Thread(target=monitor_thread)
    monitor.daemon = True
    monitor.start()
Run Code Online (Sandbox Code Playgroud)

  • 这对我来说按预期工作,但我注意到如果“monitor_thread”不是守护线程,它也可以工作。我的解释是它已经在等待“main_thread.join()”,因此当“main_thread”退出时它将被唤醒。文档说“守护进程线程在关闭时突然停止。”,这让我认为这里我们实际上可能希望“监视器”*不是*成为守护进程线程。 (2认同)