如何使用python的多处理来终止进程

Dan*_*ohn 15 python multiprocessing python-multiprocessing

我有一些代码需要针对可能挂起或有不受我控制的问题的其他几个系统运行.我想使用python的多处理来生成子进程独立于主程序运行,然后当它们挂起或有问题终止它们时,但我不确定最好的方法来解决这个问题.

当终止被调用时,它确实会终止子进程,但它会变成一个已经失效的僵尸,直到进程对象消失才会被释放.循环永远不会结束的下面的示例代码可以杀死它并在再次调用时允许重新生成,但似乎不是一个很好的解决方法(即multiprocessing.Process()在__init __()中会更好.

有人有建议吗?

class Process(object):
    def __init__(self):
        self.thing = Thing()
        self.running_flag = multiprocessing.Value("i", 1)

    def run(self):
        self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
        self.process.start()
        print self.process.pid

    def pause_resume(self):
        self.running_flag.value = not self.running_flag.value

    def terminate(self):
        self.process.terminate()

class Thing(object):
    def __init__(self):
        self.count = 1

    def worker(self,running_flag):
        while True:
            if running_flag.value:
                self.do_work()

    def do_work(self):
        print "working {0} ...".format(self.count)
        self.count += 1
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

Pie*_*Pah 8

您可以在后台运行子进程作为守护进程.

process.daemon = True
Run Code Online (Sandbox Code Playgroud)

守护进程中的任何错误和挂起(或无限循环)都不会影响主进程,只有在主进程退出后才会终止它.

这将适用于简单的问题,直到您遇到许多子守护程序进程,这些进程将在没有任何明确控制的情况下继续从父进程中获取内存.

最好的方法是设置一个Queue让所有子进程与父进程通信,以便我们可以join很好地清理它们.下面是一些简单的代码,用于检查子处理是否挂起(也称为time.sleep(1000)),并向队列发送消息以便主进程对其执行操作:

import multiprocessing as mp
import time
import queue

running_flag = mp.Value("i", 1)

def worker(running_flag, q):
    count = 1
    while True:
        if running_flag.value:
            print "working {0} ...".format(count)
            count += 1
            q.put(count)
            time.sleep(1)
            if count > 3:
                # Simulate hanging with sleep
                print "hanging..."
                time.sleep(1000)

def watchdog(q):
    """
    This check the queue for updates and send a signal to it
    when the child process isn't sending anything for too long
    """
    while True:
        try:
            msg = q.get(timeout=10.0)
        except queue.Empty as e:
            print "[WATCHDOG]: Maybe WORKER is slacking"
            q.put("KILL WORKER")

def main():
    """The main process"""
    q = mp.Queue()

    workr = mp.Process(target=worker, args=(running_flag, q))
    wdog = mp.Process(target=watchdog, args=(q,))

    # run the watchdog as daemon so it terminates with the main process
    wdog.daemon = True

    workr.start()
    print "[MAIN]: starting process P1"
    wdog.start()

    # Poll the queue
    while True:
        msg = q.get()
        if msg == "KILL WATCHDOG":
            print "[MAIN]: Terminating slacking WORKER"
            workr.terminate()
            time.sleep(0.1)
            if not workr.is_alive():
                print "[MAIN]: WORKER is a goner"
                workr.join(timeout=1.0)
                print "[MAIN]: Joined WORKER successfully!"
                q.close()
                break # watchdog process daemon gets terminated

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

没有终止worker,尝试join()它到主进程将永远阻止,因为worker从未完成.

  • 我认为您的代码中有错别字。如果msg ==“ KILL WORKER”应该不是: (2认同)

nox*_*fox 6

Python多处理处理进程的方式有点令人困惑.

从多处理指南:

加入僵尸进程

在Unix上,当一个进程完成但尚未加入时,它就变成了一个僵尸.永远不应该有很多,因为每次新进程启动(或调用active_children())时,所有尚未加入的已完成进程都将被连接.同时调用已完成的进程的Process.is_alive将加入该进程.即便如此,明确加入您开始的所有流程也许是一种好习惯.

为了避免进程成为僵尸,你需要在join()杀死它之后调用它的方法.

如果您想要一种更简单的方法来处理系统中的悬挂调用,您可以查看鹅卵石.

  • 所以仍然有必要调用process.terminate()以确保该进程已终止,或者join()仅足以杀死它们吗? (2认同)
  • “终止”通过“ SIGTERM”信号向目标进程发出终止请求。满足请求的过程取决于(大多数情况下)。如果该过程已经结束,则无效。“ join”仅指示OS结束时回收进程资源,否则它将阻塞直到此为止。必须始终在结束的进程上调用“ join”,否则OS将堆积耗尽的进程的资源。它们通常被称为“僵尸”进程,因为它们实际上是一次运行的空壳。 (2认同)

Leo*_*Leo 6

(没有足够的声望点来评论,特此完整回答)

@PieOhPah:谢谢你的这个很好的例子。
不幸的是,只有一个小缺陷不会让看门狗杀死工人:

if msg == "KILL WATCHDOG":
Run Code Online (Sandbox Code Playgroud)

它应该是:

if msg == "KILL WORKER":
Run Code Online (Sandbox Code Playgroud)

所以代码变成了(为python3更新了打印):

import multiprocessing as mp
import time
import queue

running_flag = mp.Value("i", 1)

def worker(running_flag, q):
    count = 1
    while True:
        if running_flag.value:
            print ("working {0} ...".format(count))
            count += 1
            q.put(count)
            time.sleep(1)
            if count > 3:
                # Simulate hanging with sleep
                print ("hanging...")
                time.sleep(1000)

def watchdog(q):
    """
    This check the queue for updates and send a signal to it
    when the child process isn't sending anything for too long
    """
    while True:
        try:
            msg = q.get(timeout=10.0)
        except queue.Empty as e:
            print ("[WATCHDOG]: Maybe WORKER is slacking")
            q.put("KILL WORKER")

def main():
    """The main process"""
    q = mp.Queue()

    workr = mp.Process(target=worker, args=(running_flag, q))
    wdog = mp.Process(target=watchdog, args=(q,))

    # run the watchdog as daemon so it terminates with the main process
    wdog.daemon = True

    workr.start()
    print ("[MAIN]: starting process P1")
    wdog.start()

    # Poll the queue
    while True:
        msg = q.get()
#        if msg == "KILL WATCHDOG":
        if msg == "KILL WORKER":
            print ("[MAIN]: Terminating slacking WORKER")
            workr.terminate()
            time.sleep(0.1)
            if not workr.is_alive():
                print ("[MAIN]: WORKER is a goner")
                workr.join(timeout=1.0)
                print ("[MAIN]: Joined WORKER successfully!")
                q.close()
                break # watchdog process daemon gets terminated

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)