键盘中断python的多处理池

Fra*_*rth 127 python pool multiprocessing keyboardinterrupt

如何使用python的多处理池处理KeyboardInterrupt事件?这是一个简单的例子:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()
Run Code Online (Sandbox Code Playgroud)

当运行上面的代码时,KeyboardInterrupt当我按下时会引发上升^C,但是该过程只是挂起,我必须在外部杀死它.

我希望能够随时按下^C并使所有进程正常退出.

Gle*_*ard 135

这是一个Python bug.在等待threading.Condition.wait()中的条件时,从不发送KeyboardInterrupt.摄制:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
Run Code Online (Sandbox Code Playgroud)

在wait()返回之前,不会传递KeyboardInterrupt异常,并且它永远不会返回,因此中断永远不会发生.KeyboardInterrupt几乎肯定会中断条件等待.

请注意,如果指定了超时,则不会发生这种情况; cond.wait(1)将立即收到中断.因此,解决方法是指定超时.要做到这一点,请更换

    results = pool.map(slowly_square, range(40))
Run Code Online (Sandbox Code Playgroud)

    results = pool.map_async(slowly_square, range(40)).get(9999999)
Run Code Online (Sandbox Code Playgroud)

或类似的.

  • 此错误已作为[问题8296] [1]提交.[1]:http://bugs.python.org/issue8296 (18认同)
  • @GlennMaynard我刚刚添加了你为godssake提到的问题的链接!并且由于编辑应该是实质性的自由添加一些格式标签.哪个在SO规则中很好. (13认同)
  • @GlennMaynard据我所知,这个系统明确允许编辑其他帖子来增强它们.所以我没有真正看到问题,让我推荐你:https://stackoverflow.com/tour (9认同)
  • 这并不能解决问题.有时我按Control + C时会得到预期的行为,有时则没有.我不确定为什么,但看起来可能随机的一个进程接收到KeyboardInterrupt,如果父进程是捕获它的那个,我只能得到正确的行为. (4认同)
  • Jehejj,到了 2019 年,这个问题仍然没有解决。就像并行 IO 一样,这是一个新想法:/ (4认同)
  • 官方python跟踪器中的这个bug在哪里?我找不到它,但我可能只是没有使用最好的搜索词. (3认同)
  • 对于Windows上的Python 3.6.1,这不适合我.当我执行Ctrl-C时,我得到大量的堆栈跟踪和其他垃圾,即没有这样的解决方法.事实上,我从这个线程尝试过的解决方案似乎都没有... (3认同)

小智 49

根据我最近发现的,最好的解决方案是将工作进程设置为完全忽略SIGINT,并将所有清理代码限制在父进程中.这解决了空闲和繁忙工作进程的问题,并且不需要子进程中的错误处理代码.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()
Run Code Online (Sandbox Code Playgroud)

可以在http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterrupt分别找到说明和完整示例代码.

  • 你好,约翰.你的解决方案并没有完成与我相同的事情,是不幸的复杂的解决方案.它隐藏在主进程中的`time.sleep(10)`后面.如果您要删除该睡眠,或者如果您等到该进程尝试加入池,您必须这样做以保证作业完成,那么您仍然遇到同样的问题,这是主要的进程没有在等待轮询`join`操作时它不会收到KeyboardInterrupt. (4认同)
  • 这不起作用.只有孩子被发送信号.父节点从不接收它,所以`pool.terminate()`永远不会被执行.让孩子忽视信号什么也没有完成.@ Glenn的回答解决了这个问题. (4认同)
  • 因此,通过另一种方法轮询进程完成而不是加入,这是一次繁忙的等待(可能是检查之间的小睡眠).如果是这种情况,也许最好在您的博客文章中包含此代码,因为您可以在尝试加入之前保证所有工作人员都已完成. (2认同)
  • 我的版本在 https://gist.github.com/admackin/003dd646e5fadee8b8d6 ;除了在中断时它不会调用 `.join()` - 它只是使用 `AsyncResult.ready()` 手动检查 `.apply_async()` 的结果以查看它是否准备就绪,这意味着我们已经干净利落地完成了。 (2认同)

And*_*ikh 29

由于某些原因,只能Exception正常处理从基类继承的异常.作为一种解决方法,您可以重新提升您KeyboardInterruptException实例:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

通常你会得到以下输出:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
Run Code Online (Sandbox Code Playgroud)

所以,如果你点击^C,你会得到:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Run Code Online (Sandbox Code Playgroud)

  • 看来这不是一个完整的解决方案.如果在`multiprocessing`执行自己的IPC数据交换时到达`KeyboardInterrupt`,那么`try..catch`将不会被激活(显然). (2认同)

Boo*_*boo 17

其中许多答案都是旧的,并且/或者如果您正在执行诸如 之类的方法,Pool.map它们似乎不适用于Windows 上的更高版本的 Python(我正在运行 3.8.5) ,该方法会阻塞,直到所有提交的任务完成。以下是我的解决方案。

  1. signal.signal(signal.SIGINT, signal.SIG_IGN)在主进程中发出调用以完全忽略 Ctrl-C。
  2. 处理池将使用池初始值设定项进行初始化,该池初始值设定项将这样初始化每个处理器:全局变量ctrl_c_entered将设置为,并且将发出False调用以最初忽略 Ctrl-C。该调用的返回值将被保存;这是原始的默认处理程序,重新建立时允许处理异常。signal.signal(signal.SIGINT, signal.SIG_IGN)KyboardInterrupt
  3. 装饰器handle_ctrl_c可以用于装饰多处理函数和方法,这些函数和方法应在输入 Ctrl-C 时立即退出。该装饰器将测试是否ctrl_c_entered设置了全局标志,如果设置了,则甚至懒得运行该函数/方法,而是返回一个KeyboardInterrupt异常实例。否则,将建立 a 的 try/catch 处理程序KeyboardInterrupt,并调用修饰的函数/方法。如果输入 Ctrl-C,则全局ctrl_c_entered将设置为TrueKeyboardInterrupt返回异常实例。无论如何,在返回之前装饰器都会重新建立 SIG_IGN 处理程序。

KeyBoardInterrupt本质上,所有提交的任务都将被允许启动,但一旦输入 Ctrl-C,将立即终止并返回异常值。主进程可以测试返回值是否存在这样的返回值,以检测是否输入了Ctrl-C。

from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps

def handle_ctrl_c(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            signal.signal(signal.SIGINT, default_sigint_handler) # the default
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper

@handle_ctrl_c
def slowly_square(i):
    sleep(1)
    return i*i

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

def main():
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = Pool(initializer=init_pool)
    results = pool.map(slowly_square, range(10))
    if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
        print('Ctrl-C was entered.')
    print(results)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

印刷:

Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]
Run Code Online (Sandbox Code Playgroud)


小智 7

通常这个简单的结构适用于Ctrl- C在池上:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)
Run Code Online (Sandbox Code Playgroud)

正如在几个类似的帖子中所述:

在没有try-except的情况下捕获Python中的keyboardinterrupt


Pau*_*ice 5

似乎有两个问题在多处理烦人时会产生异常.第一个(由Glenn指出)是你需要使用map_async超时而不是map为了获得立即响应(即,不完成处理整个列表).第二个(由Andrey指出)是多处理不捕获不从Exception(例如SystemExit)继承的异常.所以这是我的解决方案,处理这两个:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Run Code Online (Sandbox Code Playgroud)


小智 5

我是Python新手。我到处寻找答案,偶然发现了这个以及其他一些博客和 YouTube 视频。我尝试复制粘贴上面作者的代码,并在 Windows 7 64 位中的 python 2.7.13 上重现它。这很接近我想要实现的目标。

我让我的子进程忽略 ControlC 并使父进程终止。看起来绕过子进程确实可以避免这个问题。

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()
Run Code Online (Sandbox Code Playgroud)

从 开始的部分pool.terminate()似乎永远不会执行。


nox*_*fox 5

投票表决的答案不能解决核心问题,但具有类似的副作用。

多处理库的作者Jesse Noller解释了multiprocessing.Pool在旧博客中使用CTRL + C时如何正确处理。

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

  • 这个解决方案似乎也可以防止 Ctrl-C 中断主进程。 (2认同)