退出多处理脚本

rip*_*pat 6 python multiprocessing python-3.5

当目标函数抛出错误时,我试图退出多处理脚本,但父进程没有退出,而是挂起。

这是我用来复制问题的测试脚本:

#!/usr/bin/python3.5

import time, multiprocessing as mp

def myWait(wait, resultQueue):
    startedAt = time.strftime("%H:%M:%S", time.localtime())
    time.sleep(wait)
    endedAt = time.strftime("%H:%M:%S", time.localtime())
    name = mp.current_process().name
    resultQueue.put((name, wait, startedAt, endedAt))

# queue initialisation
resultQueue = mp.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
    mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
    ]

# starting processes
for p in proc:
    p.start()

for p in proc:
    p.join()

# print results
results = {}
for p in proc:
    name, wait, startedAt, endedAt = resultQueue.get()
    print('Process %s started at %s wait %s ended at %s' % (name, startedAt, wait, endedAt))
Run Code Online (Sandbox Code Playgroud)

这工作得很好,我可以看到父脚本产生两个子进程,htop但是当我想强制父脚本退出时,如果myWait目标函数中抛出错误,父进程只是挂起,甚至不产生任何子进程。我必须ctrl-c杀死它。

def myWait(wait, resultQueue):
    try:
        # do something wrong
    except:
        raise SystemExit
Run Code Online (Sandbox Code Playgroud)

我已经想尽办法退出的功能(例如exit()sys.exit()os._exit()...)无济于事。

Chr*_*erC 2

首先,您的代码有一个主要问题:您尝试在刷新队列内容(如果有)之前加入进程,这可能会导致死锁。请参阅此处标题为“加入使用队列的进程”的部分:https ://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming

其次,对 的调用resultQueue.get()将阻塞,直到收到一些数据,如果函数引发异常myWait并且在此之前没有数据被推送到队列中,则这种情况永远不会发生。因此,让它成为非阻塞的,并让它检查循环中的任何数据,直到它最终收到某些数据或出现问题。

这是一个快速修复方法,可以帮助您解决这个问题:

#!/usr/bin/python3.5

import multiprocessing as mp
import queue
import time

def myWait(wait, resultQueue):
    raise Exception("error!")

# queue initialisation
resultQueue = mp.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
    mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
    ]

# starting processes
for p in proc:
    p.start()

# print results
results = {}
for p in proc:
    while True:
        if not p.is_alive():
            break

        try:
            name, wait, startedAt, endedAt = resultQueue.get(block=False)
            print('Process %s started at %s wait %s ended at %s'
                  % (name, startedAt, wait, endedAt))
            break
        except queue.Empty:
            pass

for p in proc:
    p.join()
Run Code Online (Sandbox Code Playgroud)

该函数myWait将抛出异常,但两个进程仍将加入,并且程序将正常退出。