在Python中停止ThreadPool中的进程

SRD*_*SRD 7 python threadpool

我一直在尝试为控制某些硬件的库编写一个交互式包装器(用于ipython).有些调用对IO很重要,因此并行执行任务是有意义的.使用ThreadPool(几乎)很好地工作:

from multiprocessing.pool import ThreadPool

class hardware():
    def __init__(IPaddress):
        connect_to_hardware(IPaddress)

    def some_long_task_to_hardware(wtime):
        wait(wtime)
        result = 'blah'
        return result

pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
    task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
    threads.append(task)
alive = [True]*4
Try:
    while any(alive) :
        for tt in range(4): alive[tt] = not threads[tt].ready()
        do_other_stuff_for_a_bit()
except:
    #some command I cannot find that will stop the threads...
    raise
for tt in range(4): print(threads[tt].get())
Run Code Online (Sandbox Code Playgroud)

如果用户想要停止进程或者出现IO错误,则会出现问题do_other_stuff_for_a_bit().按Ctrl+ C停止主进程,但工作线程继续运行,直到当前任务完成.
有没有办法阻止这些线程,而不必重写库或让用户退出python? pool.terminate()pool.join()我在其他例子中看到的似乎并没有做到这一点.

实际例程(而不是上面的简化版本)使用日志记录,虽然所有工作线程都在某个时刻关闭,但我可以看到它们开始运行的进程一直持续到完成(并且硬件我可以通过查看看到它们的效果穿过房间).

这是在python 2.7中.

更新:

解决方案似乎是切换到使用multiprocessing.Process而不是线程池.我试过的测试代码是运行foo_pulse:

class foo(object):
    def foo_pulse(self,nPulse,name): #just one method of *many*
        print('starting pulse for '+name)
        result=[]
        for ii in range(nPulse):
            print('on for '+name)
            time.sleep(2)
            print('off for '+name)
            time.sleep(2)
            result.append(ii)
        return result,name
Run Code Online (Sandbox Code Playgroud)

如果你尝试使用ThreadPool运行它,那么ctrl-C不会阻止foo_pulse运行(即使它确实会立即杀死线程,打印语句仍在继续:

from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
    a=foo()
    pool=ThreadPool(processes=4)
    threads=[]
    for rn in range(4) :
        r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
        threads.append(r)
    alive=[True]*4
    try:
        while any(alive) : #wait until all threads complete
            for rn in range(4):
                alive[rn] = not threads[rn].ready() 
                time.sleep(1)
    except : #stop threads if user presses ctrl-c
        print('trying to stop threads')
        pool.terminate()
        print('stopped threads') # this line prints but output from foo_pulse carried on.
        raise
    else : 
        for t in threads : print(t.get())
Run Code Online (Sandbox Code Playgroud)

但是,使用multiprocessing.Process的版本按预期工作:

import multiprocessing as mp
import time
def test_pro(nPulse):
    pros=[]
    ans=[]
    a=foo()
    for rn in range(4) :
        q=mp.Queue()
        ans.append(q)
        r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
        r.start()
        pros.append(r)
    try:
        for p in pros : p.join()
        print('all done')
    except : #stop threads if user stops findRes
        print('trying to stop threads')
        for p in pros : p.terminate()
        print('stopped threads')
    else : 
        print('output here')
        for q in ans :
            print(q.get())
    print('exit time')
Run Code Online (Sandbox Code Playgroud)

我已经为库foo定义了一个包装器(因此它不需要重写).如果不需要返回值,则此包装器也不是:

def wrapper(a,target,q,args=(),kwargs={}):
    '''Used when return value is wanted'''
    q.put(getattr(a,target)(*args,**kwargs))
Run Code Online (Sandbox Code Playgroud)

从文档中我看到没有理由为什么池不起作用(除了bug).

cha*_*elo 3

这是并行性的一个非常有趣的用途。

但是,如果您使用multiprocessing,目标是让多个进程并行运行,而不是一个进程运行多个线程。

考虑使用以下一些更改来实现它multiprocessing

您有以下将并行运行的函数:

import time
import multiprocessing as mp


def some_long_task_from_library(wtime):
    time.sleep(wtime)


class MyException(Exception): pass

def do_other_stuff_for_a_bit():
    time.sleep(5)
    raise MyException("Something Happened...")
Run Code Online (Sandbox Code Playgroud)

让我们创建并启动进程,例如 4:

procs = []  # this is not a Pool, it is just a way to handle the
            # processes instead of calling them p1, p2, p3, p4...
for _ in range(4):
    p = mp.Process(target=some_long_task_from_library, args=(1000,))
    p.start()
    procs.append(p)
mp.active_children()   # this joins all the started processes, and runs them.
Run Code Online (Sandbox Code Playgroud)

这些进程可能是在单独的 cpu 核心中并行运行,但这由操作系统决定。您可以检查系统监视器。

与此同时,您运行一个将中断的进程,并且您希望停止正在运行的进程,而不是让它们成为孤儿:

try:
    do_other_stuff_for_a_bit()
except MyException as exc:
    print(exc)
    print("Now stopping all processes...")
    for p in procs:
        p.terminate()
print("The rest of the process will continue")
Run Code Online (Sandbox Code Playgroud)

如果当一个或所有子进程终止时继续主进程没有意义,则应该处理主程序的退出。

希望它有所帮助,并且您可以为您的图书馆调整其中的一些内容。