在Python中,我如何知道进程何时完成?

Mat*_*ker 24 python parallel-processing user-interface multithreading multiprocessing

在Python GUI(PyGTK)中,我启动一个进程(使用多处理).这个过程需要很长时间(约20分钟)才能完成.当过程完成后,我想清理它(提取结果并加入过程).我怎么知道这个过程何时完成?

我的同事在父进程中建议了一个繁忙的循环,它检查子进程是否已经完成.当然有更好的方法.

在Unix中,当分叉进程时,在子进程完成时从父进程内调用信号处理程序.但我在Python中看不到类似的东西.我错过了什么吗?

如何从父进程中观察到子进程的结束?(当然,我不想调用Process.join(),因为它会冻结GUI界面.)

这个问题不仅限于多处理:我对多线程有完全相同的问题.

man*_*est 11

我认为作为制作python多平台的一部分,像SIGCHLD这样的简单事情必须自己完成.同意,当你想要做的就是知道孩子何时完成时,这是一项更多的工作,但这真的不是那么痛苦.考虑以下使用子进程来完成工作,两个multiprocessing.Event实例,以及一个检查子进程是否完成的线程:

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def checkChild(event, killEvent):
    event.wait()
    print "Child checked, and is done playing"
    if raw_input("Do again? y/n:") == "y":
        event.clear()
        t = threading.Thread(target=checkChild, args=(event, killEvent))
        t.start()
        p = Process(target=childsPlay, args=(event,))
        p.start()
    else:
        cleanChild()
        killEvent.set()

def cleanChild():
    print "Cleaning up the child..."

if __name__ == '__main__':
    event = Event()
    killEvent = Event()

    # process to do work
    p = Process(target=childsPlay, args=(event,))
    p.start()

    # thread to check on child process
    t = threading.Thread(target=checkChild, args=(event, killEvent))
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        print "Main done"
Run Code Online (Sandbox Code Playgroud)

编辑

加入所有创建的进程和线程是一种很好的做法,因为它有助于指示何时创建僵尸(永不完成)进程/线程.我已经改变了上面的代码,使得一个继承自threading.Thread的ChildChecker类.它的唯一目的是在一个单独的进程中启动一个作业,等待该进程完成,然后在一切完成时通知GUI.加入ChildChecker也将加入它"检查"的过程.现在,如果进程在5秒后没有加入,则线程将强制终止进程.输入"y"创建启动运行"endlessChildsPlay"的子进程,该进程必须显示强制终止.

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def endlessChildsPlay(event):
    print "Endless child started"
    while True:
        print "Endless child is playing..."
        sleep(1)
        event.set()
    print "Endless child done"

class ChildChecker(threading.Thread):
    def __init__(self, killEvent):
        super(ChildChecker, self).__init__()
        self.killEvent = killEvent
        self.event = Event()
        self.process = Process(target=childsPlay, args=(self.event,))

    def run(self):
        self.process.start()

        while not self.killEvent.is_set():
            self.event.wait()
            print "Child checked, and is done playing"
            if raw_input("Do again? y/n:") == "y":
                self.event.clear()
                self.process = Process(target=endlessChildsPlay, args=(self.event,))
                self.process.start()
            else:
                self.cleanChild()
                self.killEvent.set()

    def join(self):
        print "Joining child process"
        # Timeout on 5 seconds
        self.process.join(5)

        if self.process.is_alive():
            print "Child did not join!  Killing.."
            self.process.terminate()
        print "Joining ChildChecker thread"
        super(ChildChecker, self).join()


    def cleanChild(self):
        print "Cleaning up the child..."

if __name__ == '__main__':
    killEvent = Event()
    # thread to check on child process
    t = ChildChecker(killEvent)
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        t.join()
        print "Main done"
Run Code Online (Sandbox Code Playgroud)


Mat*_*ker 5

这个答案真的很简单!(我花了几天时间才弄明白。)

结合 PyGTK 的 idle_add(),你可以创建一个 AutoJoiningThread。总代码是微不足道的:

class AutoJoiningThread(threading.Thread):
    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
Run Code Online (Sandbox Code Playgroud)

如果您想做的不仅仅是加入(例如收集结果),那么您可以扩展上述类以在完成时发出信号,如下例所示:

import threading
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = None

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
        print "Child finished playing."
        self.result = 42

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    thread = AutoJoiningThread(target=child.play,
                               args=(3,))
    thread.connect('finished', child.get_result)
    print "Starting thread"
    thread.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()
Run Code Online (Sandbox Code Playgroud)

上面示例的输出将取决于线程的执行顺序,但类似于:

创建子项
创建线程
起始线程
孩子开始玩。
 孩子在玩耍。
运行主循环(Ctrl+C 退出)
孩子在玩耍。
孩子在玩耍。
孩子玩完了。
调用 Thread.join()
结果是 42
^C接收键盘中断。退出。

不可能以相同的方式创建 AutoJoiningProcess(因为我们不能跨两个不同的进程调用 idle_add()),但是我们可以使用 AutoJoiningThread 来获得我们想要的:

class AutoJoiningProcess(multiprocessing.Process):
    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start() # automatically joins

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()
Run Code Online (Sandbox Code Playgroud)

为了演示 AutoJoiningProcess 这里是另一个例子:

import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = multiprocessing.Manager().list()

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
    print "Child finished playing."
        self.result.append(42)

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
    }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start()

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        multiprocessing.Process.join(self)
        print "Called Process.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    process = AutoJoiningProcess(target=child.play,
                               args=(3,))
    process.connect('finished',child.get_result)
    print "Starting thread"
    process.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()
Run Code Online (Sandbox Code Playgroud)

结果输出将与上面的示例非常相似,除了这次我们同时加入了进程和它的伴随线程:

创建子项
创建线程
起始线程
运行主循环(Ctrl+C 退出)
 孩子开始玩。
孩子在玩耍。
孩子在玩耍。
孩子在玩耍。
孩子玩完了。
调用 Process.join()
结果是 [42]
调用 Thread.join()
^C接收键盘中断。退出。

很遗憾:

  1. 由于使用了 idle_add(),此解决方案依赖于 gobject。PyGTK 使用 gobject。
  2. 这不是真正的父子关系。如果这些线程中的一个由另一个线程启动,那么它仍然会被运行主循环的线程加入,而不是父线程。这个问题也适用于 AutoJoiningProcess,除非我认为会抛出异常。

因此,要使用这种方法,最好仅从主循环/GUI 内创建线程/进程。