Python多处理 - 捕获信号以重新启动子进程或关闭父进程

Pyt*_*ice 6 python linux signals python-multiprocessing

我正在使用多处理库来生成两个子进程.我想确保只要父进程处于活动状态,如果子进程死掉(接收SIGKILL或SIGTERM),它们就会自动重启.另一方面,如果父进程收到SIGTERM/SIGINT,我希望它终止所有子进程然后退出.

这就是我解决问题的方法:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN
from functools import partial
import multiprocessing
import setproctitle

class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("helloProcess")

        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()

        self.counter = 1

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("counterProcess")

        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(helloProcess, counterProcess, signum, frame):

    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess

    if signum == 17:

        print "helloProcess: ", helloProcess.is_alive()

        if not helloProcess.is_alive():
            print "Restarting helloProcess"

            helloProcess = HelloWorld()
            helloProcess.start()

        print "counterProcess: ", counterProcess.is_alive()

        if not counterProcess.is_alive():
            print "Restarting counterProcess"

            counterProcess = Counter()
            counterProcess.start()

    else:

        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()

        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()

        sys.exit(0)



if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))

    multiprocessing.active_children()
Run Code Online (Sandbox Code Playgroud)

如果我将SIGKILL发送到counterProcess,它将正确重启.但是,将SIGKILL发送到helloProcess还会重新启动counterProcess而不是helloProcess吗?

如果我将SIGTERM发送到父进程,父进程将退出,但子进程将成为孤立并继续.我该如何纠正这种行为?

tho*_*ter 5

这个代码有几个问题,所以我将在sequentailly中检查它们.

如果我将SIGKILL发送到counterProcess,它将正确重启.但是,将SIGKILL发送到helloProcess还会重新启动counterProcess而不是helloProcess吗?

这种奇特的行为很可能是由于主进程multiprocessing.active_children()中没有阻塞调用,因为它并不真正起作用.我无法解释程序行为的确切原因,但在__main__函数中添加阻塞调用,例如.

while True:
    time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

解决了这个问题.

另一个非常严重的问题是将对象传递给处理程序的方式:

helloProcess = HelloWorld()
...
partial(signal_handler, helloProcess, counterProcess)
Run Code Online (Sandbox Code Playgroud)

考虑到你在里面创建新对象,这是绝对的:

if not helloProcess.is_alive():
    print "Restarting helloProcess"

    helloProcess = HelloWorld()
    helloProcess.start()
Run Code Online (Sandbox Code Playgroud)

请注意,两个对象对对象使用不同的别名HelloWorld().部分对象在__main__函数中绑定到别名,而回调中的对象绑定到其本地范围别名.因此,通过将新对象分配给本地范围别名,您实际上不会影响回调绑定的对象(它仍然绑定到在__main__范围中创建的对象).

您可以通过在回调范围内以相同方式使用新对象重新绑定信号回调来修复它:

def signal_handler(...):
    ...
    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))
    ...
Run Code Online (Sandbox Code Playgroud)

但是,这会导致另一个陷阱,因为现在每个子进程都将继承父进程的回调,并在每次接收信号时访问它.要修复它,您可以在创建子进程之前临时将信号处理程序设置为默认值:

for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
    signal(signame, SIG_DFL)
Run Code Online (Sandbox Code Playgroud)

最后,您可能希望在终止它们之前压制来自子进程的任何信号,否则它们将再次触发回调:

signal(SIGCHLD, SIG_IGN)
Run Code Online (Sandbox Code Playgroud)

请注意,您需要重新设计应用程序的体系结构并利用所multiprocessing提供的一些功能.

最终代码:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN, SIG_DFL
from functools import partial
import multiprocessing
#import setproctitle

class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()

        # ignore, let parent handle it
        #signal(SIGTERM, SIG_IGN)

    def run(self):

        #setproctitle.setproctitle("helloProcess")

        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()

        self.counter = 1

        # ignore, let parent handle it
        #signal(SIGTERM, SIG_IGN)

    def run(self):

        #setproctitle.setproctitle("counterProcess")

        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(helloProcess, counterProcess, signum, frame):

    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess

    print "current_process: ", multiprocessing.current_process()

    if signum == 17:

        # Since each new child inherits current signal handler,
        # temporarily set it to default before spawning new child.
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, SIG_DFL)

        print "helloProcess: ", helloProcess.is_alive()

        if not helloProcess.is_alive():
            print "Restarting helloProcess"

            helloProcess = HelloWorld()
            helloProcess.start()

        print "counterProcess: ", counterProcess.is_alive()

        if not counterProcess.is_alive():
            print "Restarting counterProcess"

            counterProcess = Counter()
            counterProcess.start()

        # After new children are spawned, revert to old signal handling policy.
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, partial(signal_handler, helloProcess, counterProcess))


    else:

        # Ignore any signal that child communicates before quit   
        signal(SIGCHLD, SIG_IGN) 

        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()

        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()

        sys.exit(0)



if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))

    while True:
        print multiprocessing.active_children()
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)


Niz*_*med 4

要从signal.SIGCHLD处理程序重新创建死去的孩子,母亲必须调用函数之一os.wait,因为Process.is_alive在这里不起作用。
尽管可能,但它很复杂,因为signal.SIGCHLD当其中一个孩子的状态发生变化时signal.SIGSTOPsignal.SIGCONT或者孩子收到任何其他终止信号时,它就会传递给母亲。
因此,signal.SIGCHLD处理者必须区分孩子的这些状态。仅仅在signal.SIGCHLD分娩时重新创造孩子可能会创造出不必要的孩子。

以下代码使用os.waitpidwithos.WNOHANG使其成为非阻塞,os.WUNTRACEDos.WCONTINUED用于学习 if signal.SIGCHLDis 来自signal.SIGSTOPor signal.SIGCONT
os.waitpid不起作用,(0, 0)即如果任何实例Processprint编辑,即str(Process())在调用之前,则返回os.waitpid

import sys
import time
from signal import signal, pause, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_DFL
import multiprocessing
import os

class HelloWorld(multiprocessing.Process):
    def run(self):
        # reset SIGTERM to default for Process.terminate to work
        signal(SIGTERM, SIG_DFL)
        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1

    def run(self):
        # reset SIGTERM to default for Process.terminate to work
        signal(SIGTERM, SIG_DFL)
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(signum, _):
    global helloProcess, counterProcess

    if signum == SIGCHLD:
        pid, status = os.waitpid(-1, os.WNOHANG|os.WUNTRACED|os.WCONTINUED)
        if os.WIFCONTINUED(status) or os.WIFSTOPPED(status):
            return
        if os.WIFSIGNALED(status) or os.WIFEXITED(status):
            if helloProcess.pid == pid:
                print("Restarting helloProcess")
                helloProcess = HelloWorld()
                helloProcess.start()

            elif counterProcess.pid == pid:
                print("Restarting counterProcess")
                counterProcess = Counter()
                counterProcess.start()

    else:
        # mother shouldn't be notified when it terminates children
        signal(SIGCHLD, SIG_DFL)
        if helloProcess.is_alive():
            print("Stopping helloProcess")
            helloProcess.terminate()

        if counterProcess.is_alive():
            print("Stopping counterProcess")
            counterProcess.terminate()

        sys.exit(0)

if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, signal_handler)

    while True:
        pause()
Run Code Online (Sandbox Code Playgroud)

以下代码重新创建死去的孩子而不使用signal.SIGCHLD. 所以它比前者更简单。
创建两个子进程后,母进程设置一个名为term_childSIGINT、SIGTERM、SIGQUIT 的信号处理程序。term_child在调用时终止并加入每个子进程。

母进程不断检查子进程是否还活着,并在必要时在while循环中重新创建它们。

由于每个孩子都会从母亲那里继承信号处理程序,因此SIGINT应将处理程序重置为其默认值才能Process.terminate工作

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT
import multiprocessing

class HelloWorld(multiprocessing.Process):    
    def run(self):
        signal(SIGTERM, SIG_DFL)
        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1

    def run(self):
        signal(SIGTERM, SIG_DFL)
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1

def term_child(_, __):
    for child in children:
        child.terminate()
        child.join()
    sys.exit(0)

if __name__ == '__main__':

    children = [HelloWorld(), Counter()]
    for child in children:
        child.start()

    for signame in (SIGINT, SIGTERM, SIGQUIT):
        signal(signame, term_child)

    while True:
        for i, child in enumerate(children):
            if not child.is_alive():
                children[i] = type(child)()
                children[i].start()
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)