Dan*_*lch 5 python parallel-processing process multiprocessing
我正在尝试使用多个进程创建一个程序,如果发生错误,我想干净地终止所有生成的进程.下面我已经写了一些伪类型代码,我认为我需要做什么,但我不知道最好的方法是与错误发生的所有进程进行通信,它们应该终止.
我想我应该使用类来做这类事情,但我对Python很陌生,所以我只想先了解一下基础知识.
#imports
exitFlag = True
# Function for threads to process
def url_thread_worker( ):
# while exitFlag:
try:
# do something
except:
# we've ran into a problem, we need to kill all the spawned processes and cleanly exit the program
exitFlag = False
def processStarter( ):
process_1 = multiprocessing.Process( name="Process-1", target=url_thread_worker, args=( ) )
process_2 = multiprocessing.Process( name="Process-2", target=url_thread_worker, args=( ) )
process_1.start()
process_2.start()
if __name__ == '__main__':
processStarter( )
Run Code Online (Sandbox Code Playgroud)
提前致谢
这是我的建议:
import multiprocessing
import threading
import time
def good_worker():
print "[GoodWorker] Starting"
time.sleep(4)
print "[GoodWorker] all good"
def bad_worker():
print "[BadWorker] Starting"
time.sleep(2)
raise Exception("ups!")
class MyProcManager(object):
def __init__(self):
self.procs = []
self.errors_flag = False
self._threads = []
self._lock = threading.Lock()
def terminate_all(self):
with self._lock:
for p in self.procs:
if p.is_alive():
print "Terminating %s" % p
p.terminate()
def launch_proc(self, func, args=(), kwargs= {}):
t = threading.Thread(target=self._proc_thread_runner,
args=(func, args, kwargs))
self._threads.append(t)
t.start()
def _proc_thread_runner(self, func, args, kwargs):
p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
self.procs.append(p)
p.start()
while p.exitcode is None:
p.join()
if p.exitcode > 0:
self.errors_flag = True
self.terminate_all()
def wait(self):
for t in self._threads:
t.join()
if __name__ == '__main__':
proc_manager = MyProcManager()
proc_manager.launch_proc(good_worker)
proc_manager.launch_proc(good_worker)
proc_manager.launch_proc(bad_worker)
proc_manager.wait()
if proc_manager.errors_flag:
print "Errors flag is set: some process crashed"
else:
print "Everything closed cleanly"
Run Code Online (Sandbox Code Playgroud)
您需要为每个进程运行一个包装线程,等待其结束。当进程结束时,检查退出代码:如果> 0,则意味着它引发了一些未处理的异常。现在调用terminate_all()来关闭所有剩余的活动进程。包装器线程也将完成,因为它们依赖于进程运行。
此外,在您的代码中,您可以完全自由地随时调用 proc_manager.terminate_all() 。您可以检查不同线程中的某些标志或类似的东西。
希望这对您的情况有好处。
PS:顺便说一句..在您的原始代码中,您做了类似全局 exit_flag 的操作:在多处理中永远不可能有“全局” exit_flag,因为它根本不是全局的,因为您使用的是具有独立内存空间的独立进程。这只适用于可以共享状态的线程环境。如果您在多处理中需要它,那么您必须在进程之间进行显式通信(管道和队列实现这一点)或诸如共享内存对象之类的东西
| 归档时间: |
|
| 查看次数: |
8906 次 |
| 最近记录: |