当被信号中断时,multiprocessing.Event.wait挂起

Ale*_*lex 2 python python-multithreading

我使用以下代码来处理SIGINT事件。该代码将multiprocessing.event设置为“唤醒”正在等待的主线程。

import multiprocessing
import signal

class Class1(object):
    _stop_event = multiprocessing.Event()

    @staticmethod
    def interrupt():
        Class1._stop_event.set()

    def wait(self):
        print("Waiting for _stop_event")
        if Class1._stop_event.wait(5):
            print("_stop_event set.")
        else:
            print("Timed out.")

def stop(signum, frame):
    print("Received SIG")
    Class1.interrupt()

signal.signal(signal.SIGINT, stop)

c = Class1()
c.wait()
Run Code Online (Sandbox Code Playgroud)

没有任何信号,wait方法将在10秒后超时,并且该过程将按预期方式退出,并显示以下输出:

Waiting for _stop_event
Timed out.
Run Code Online (Sandbox Code Playgroud)

发送SIGINT信号时,将处理该信号,但event.wait方法不会立即或在超时后都不会返回。该过程永远不会退出。输出为:

Waiting for _stop_event
^CReceived SIG
Run Code Online (Sandbox Code Playgroud)

我可以继续发送SIGINT。该过程将不会退出,输出为:

Waiting for _stop_event
^CReceived SIG
^CReceived SIG
^CReceived SIG
^CReceived SIG
....
Run Code Online (Sandbox Code Playgroud)

如果将Class1.wait方法替换为event.is_set检查,一切都会按预期进行:

def wait(self):
    print("Waiting for _stop_event")
    while True:
        if Class1._stop_event.is_set():
            print("_stop_event set.")
            break
Run Code Online (Sandbox Code Playgroud)

该过程退出,输出为:

Waiting for _stop_event
^CReceived SIG
_stop_event set.
Run Code Online (Sandbox Code Playgroud)

设置事件后如何使event.wait返回?等待方法甚至不再超时的原因是什么?

Ale*_*lex 5

信号仅在主线程上处理。如果主线程在系统调用中被阻止,则该系统调用将引发InterruptedError。

从Python文档中:

它们[信号]只能出现在Python解释器的“原子”指令之间

例如,time.sleep将引发InterruptedError。似乎event.wait方法不能正确处理此方案。它不会引发InterruptedError,而只是开始挂起。在我看来,这似乎是Python中的错误?


更新:

我将其缩小为multiprocessing.Event中的死锁。如果主线程正在等待事件,并且同时有一个信号在被中断的主线程上设置该事件,则multiprocessing.event.set()和multiprocessing.event.wait()方法将彼此死锁。

同样,该行为在很大程度上取决于平台。例如,time.sleep()在Windows上会引发InterruptedError,而在Linux上只是返回。


一个非常笨拙的解决方法是保持主线程空闲以处理信号。

import multiprocessing
import signal
import threading
import time


class Class1(object):
    _stop_event = multiprocessing.Event()

    @staticmethod
    def interrupt():
        Class1._stop_event.set()

    def wait_timeout(self):
        print("Waiting for _stop_event")
        if Class1._stop_event.wait(30):
            print("_stop_event set.")
        else:
            print("Timeout")


def stop(signum, frame):
    print("Received SIG")
    Class1.interrupt()
    exit_event.set()


def run():
    c = Class1()
    c.wait_timeout()

t = threading.Thread(target=run)
t.daemon = False
t.start()

exit_event = multiprocessing.Event()
signal.signal(signal.SIGINT, stop)

while not exit_event.is_set():
    # Keep a main thread around to handle the signal and
    # that thread must not be in a event.wait()!
    try:
        time.sleep(500)
    except InterruptedError:
        # We were interrupted by the incoming signal.
        # Let the signal handler stop the process gracefully.
        pass
Run Code Online (Sandbox Code Playgroud)

这真丑。有人请提供更优雅的解决方案。

  • 您还可以将 exit_event.set() 委托给另一个线程,而不是调用它并创建死锁。基本上,信号回调变成:`Thread(target=exit_event.set).start()` (2认同)

joh*_*all 4

你们一定会喜欢这个的。使用threading.Event,不使用multiprocessing.Event。然后,当您按下时,^C信号处理程序就会像它应该的那样被调用!

来源

import threading
import signal

class Class1(object):
    _stop_event = threading.Event()

    @staticmethod
    def interrupt():
        Class1._stop_event.set()

    def wait(self):
        print("Waiting for _stop_event")
        if Class1._stop_event.wait(5):
            print("_stop_event set.")
        else:
            print("Timed out.")

def stop(signum, frame):
    print("Received SIG")
    Class1.interrupt()

signal.signal(signal.SIGINT, stop)

c = Class1()
c.wait()
Run Code Online (Sandbox Code Playgroud)

输出

Waiting for _stop_event
^CReceived SIG
_stop_event set.
Run Code Online (Sandbox Code Playgroud)