Mat*_*ker 24 python parallel-processing user-interface multithreading multiprocessing
在Python GUI(PyGTK)中,我启动一个进程(使用多处理).这个过程需要很长时间(约20分钟)才能完成.当过程完成后,我想清理它(提取结果并加入过程).我怎么知道这个过程何时完成?
我的同事在父进程中建议了一个繁忙的循环,它检查子进程是否已经完成.当然有更好的方法.
在Unix中,当分叉进程时,在子进程完成时从父进程内调用信号处理程序.但我在Python中看不到类似的东西.我错过了什么吗?
如何从父进程中观察到子进程的结束?(当然,我不想调用Process.join(),因为它会冻结GUI界面.)
这个问题不仅限于多处理:我对多线程有完全相同的问题.
man*_*est 11
我认为作为制作python多平台的一部分,像SIGCHLD这样的简单事情必须自己完成.同意,当你想要做的就是知道孩子何时完成时,这是一项更多的工作,但这真的不是那么痛苦.考虑以下使用子进程来完成工作,两个multiprocessing.Event实例,以及一个检查子进程是否完成的线程:
import threading
from multiprocessing import Process, Event
from time import sleep
def childsPlay(event):
print "Child started"
for i in range(3):
print "Child is playing..."
sleep(1)
print "Child done"
event.set()
def checkChild(event, killEvent):
event.wait()
print "Child checked, and is done playing"
if raw_input("Do again? y/n:") == "y":
event.clear()
t = threading.Thread(target=checkChild, args=(event, killEvent))
t.start()
p = Process(target=childsPlay, args=(event,))
p.start()
else:
cleanChild()
killEvent.set()
def cleanChild():
print "Cleaning up the child..."
if __name__ == '__main__':
event = Event()
killEvent = Event()
# process to do work
p = Process(target=childsPlay, args=(event,))
p.start()
# thread to check on child process
t = threading.Thread(target=checkChild, args=(event, killEvent))
t.start()
try:
while not killEvent.is_set():
print "GUI running..."
sleep(1)
except KeyboardInterrupt:
print "Quitting..."
exit(0)
finally:
print "Main done"
Run Code Online (Sandbox Code Playgroud)
加入所有创建的进程和线程是一种很好的做法,因为它有助于指示何时创建僵尸(永不完成)进程/线程.我已经改变了上面的代码,使得一个继承自threading.Thread的ChildChecker类.它的唯一目的是在一个单独的进程中启动一个作业,等待该进程完成,然后在一切完成时通知GUI.加入ChildChecker也将加入它"检查"的过程.现在,如果进程在5秒后没有加入,则线程将强制终止进程.输入"y"创建启动运行"endlessChildsPlay"的子进程,该进程必须显示强制终止.
import threading
from multiprocessing import Process, Event
from time import sleep
def childsPlay(event):
print "Child started"
for i in range(3):
print "Child is playing..."
sleep(1)
print "Child done"
event.set()
def endlessChildsPlay(event):
print "Endless child started"
while True:
print "Endless child is playing..."
sleep(1)
event.set()
print "Endless child done"
class ChildChecker(threading.Thread):
def __init__(self, killEvent):
super(ChildChecker, self).__init__()
self.killEvent = killEvent
self.event = Event()
self.process = Process(target=childsPlay, args=(self.event,))
def run(self):
self.process.start()
while not self.killEvent.is_set():
self.event.wait()
print "Child checked, and is done playing"
if raw_input("Do again? y/n:") == "y":
self.event.clear()
self.process = Process(target=endlessChildsPlay, args=(self.event,))
self.process.start()
else:
self.cleanChild()
self.killEvent.set()
def join(self):
print "Joining child process"
# Timeout on 5 seconds
self.process.join(5)
if self.process.is_alive():
print "Child did not join! Killing.."
self.process.terminate()
print "Joining ChildChecker thread"
super(ChildChecker, self).join()
def cleanChild(self):
print "Cleaning up the child..."
if __name__ == '__main__':
killEvent = Event()
# thread to check on child process
t = ChildChecker(killEvent)
t.start()
try:
while not killEvent.is_set():
print "GUI running..."
sleep(1)
except KeyboardInterrupt:
print "Quitting..."
exit(0)
finally:
t.join()
print "Main done"
Run Code Online (Sandbox Code Playgroud)
这个答案真的很简单!(我花了几天时间才弄明白。)
结合 PyGTK 的 idle_add(),你可以创建一个 AutoJoiningThread。总代码是微不足道的:
class AutoJoiningThread(threading.Thread):
def run(self):
threading.Thread.run(self)
gobject.idle_add(self.join)
Run Code Online (Sandbox Code Playgroud)
如果您想做的不仅仅是加入(例如收集结果),那么您可以扩展上述类以在完成时发出信号,如下例所示:
import threading
import time
import sys
import gobject
gobject.threads_init()
class Child:
def __init__(self):
self.result = None
def play(self, count):
print "Child starting to play."
for i in range(count):
print "Child playing."
time.sleep(1)
print "Child finished playing."
self.result = 42
def get_result(self, obj):
print "The result was "+str(self.result)
class AutoJoiningThread(threading.Thread, gobject.GObject):
__gsignals__ = {
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
gobject.GObject.__init__(self)
def run(self):
threading.Thread.run(self)
gobject.idle_add(self.join)
gobject.idle_add(self.emit, 'finished')
def join(self):
threading.Thread.join(self)
print "Called Thread.join()"
if __name__ == '__main__':
print "Creating child"
child = Child()
print "Creating thread"
thread = AutoJoiningThread(target=child.play,
args=(3,))
thread.connect('finished', child.get_result)
print "Starting thread"
thread.start()
print "Running mainloop (Ctrl+C to exit)"
mainloop = gobject.MainLoop()
try:
mainloop.run()
except KeyboardInterrupt:
print "Received KeyboardInterrupt. Quiting."
sys.exit()
print "God knows how we got here. Quiting."
sys.exit()
Run Code Online (Sandbox Code Playgroud)
上面示例的输出将取决于线程的执行顺序,但类似于:
创建子项 创建线程 起始线程 孩子开始玩。 孩子在玩耍。 运行主循环(Ctrl+C 退出) 孩子在玩耍。 孩子在玩耍。 孩子玩完了。 调用 Thread.join() 结果是 42 ^C接收键盘中断。退出。
不可能以相同的方式创建 AutoJoiningProcess(因为我们不能跨两个不同的进程调用 idle_add()),但是我们可以使用 AutoJoiningThread 来获得我们想要的:
class AutoJoiningProcess(multiprocessing.Process):
def start(self):
thread = AutoJoiningThread(target=self.start_process)
thread.start() # automatically joins
def start_process(self):
multiprocessing.Process.start(self)
self.join()
Run Code Online (Sandbox Code Playgroud)
为了演示 AutoJoiningProcess 这里是另一个例子:
import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()
class Child:
def __init__(self):
self.result = multiprocessing.Manager().list()
def play(self, count):
print "Child starting to play."
for i in range(count):
print "Child playing."
time.sleep(1)
print "Child finished playing."
self.result.append(42)
def get_result(self, obj):
print "The result was "+str(self.result)
class AutoJoiningThread(threading.Thread, gobject.GObject):
__gsignals__ = {
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
gobject.GObject.__init__(self)
def run(self):
threading.Thread.run(self)
gobject.idle_add(self.join)
gobject.idle_add(self.emit, 'finished')
def join(self):
threading.Thread.join(self)
print "Called Thread.join()"
class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
__gsignals__ = {
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
gobject.GObject.__init__(self)
def start(self):
thread = AutoJoiningThread(target=self.start_process)
thread.start()
def start_process(self):
multiprocessing.Process.start(self)
self.join()
gobject.idle_add(self.emit, 'finished')
def join(self):
multiprocessing.Process.join(self)
print "Called Process.join()"
if __name__ == '__main__':
print "Creating child"
child = Child()
print "Creating thread"
process = AutoJoiningProcess(target=child.play,
args=(3,))
process.connect('finished',child.get_result)
print "Starting thread"
process.start()
print "Running mainloop (Ctrl+C to exit)"
mainloop = gobject.MainLoop()
try:
mainloop.run()
except KeyboardInterrupt:
print "Received KeyboardInterrupt. Quiting."
sys.exit()
print "God knows how we got here. Quiting."
sys.exit()
Run Code Online (Sandbox Code Playgroud)
结果输出将与上面的示例非常相似,除了这次我们同时加入了进程和它的伴随线程:
创建子项 创建线程 起始线程 运行主循环(Ctrl+C 退出) 孩子开始玩。 孩子在玩耍。 孩子在玩耍。 孩子在玩耍。 孩子玩完了。 调用 Process.join() 结果是 [42] 调用 Thread.join() ^C接收键盘中断。退出。
很遗憾:
因此,要使用这种方法,最好仅从主循环/GUI 内创建线程/进程。
| 归档时间: |
|
| 查看次数: |
23528 次 |
| 最近记录: |