ThreadPoolExecutor 键盘中断

she*_*idp 5 python threadpoolexecutor concurrent.futures

我有以下代码,它使用并发.futures.ThreadPoolExecutor 以计量方式启动另一个程序的进程(一次不超过 30 个)。我还希望能够在 ctrl-C python 进程时停止所有工作。这段代码的工作有一个警告:我必须按 ctrl-C 两次。我第一次发送 SIGINT 时,没有任何反应;第二次,我看到“向进程发送 SIGKILL”,进程死亡,但它起作用了。我的第一个 SIGINT 发生了什么?

execution_list = [['prog', 'arg1'], ['prog', 'arg2']] ... etc
processes = []

def launch_instance(args):
    process = subprocess.Popen(args)
    processes.append(process)
    process.wait()

try:
    with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
        results = list(executor.map(launch_instance, execution_list))
except KeyboardInterrupt:
    print('sending SIGKILL to processes')
    for p in processes:
        if p.poll() is None: #If process is still alive
            p.send_signal(signal.SIGKILL)
Run Code Online (Sandbox Code Playgroud)

Flo*_*anK 1

我在尝试解决类似问题时偶然发现了你的问题。不能 100% 确定它会解决您的用例(我没有使用子流程),但我认为它会。

executor只要作业仍在运行,您的代码就会保留在上下文管理器中。我有根据的猜测是,第一个 KeyboardInterrupt 将被 ThreadPoolExecutor 捕获,其默认行为是不启动任何新作业,等到当前作业完成,然后进行清理(并可能重新引发 KeyboardInterrupt)。但这些进程可能会长时间运行,因此您不会注意到。然后,第二个 KeyboardInterrupt 会中断此错误处理。

我如何解决我的问题(单独线程中的无限后台进程)是使用以下代码:

from concurrent.futures import ThreadPoolExecutor
import signal
import threading
from time import sleep


def loop_worker(exiting):
    while not exiting.is_set():
        try:
            print("started work")
            sleep(10)
            print("finished work")
        except KeyboardInterrupt:
            print("caught keyboardinterrupt")  # never caught here. just for demonstration purposes


def loop_in_worker():
    exiting = threading.Event()
    def signal_handler(signum, frame):
        print("Setting exiting event")
        exiting.set()

    signal.signal(signal.SIGTERM, signal_handler)
    with ThreadPoolExecutor(max_workers=1) as executor:
        executor.submit(loop_worker, exiting)

        try:
            while not exiting.is_set():
                sleep(1)
                print('waiting')
        except KeyboardInterrupt:
            print('Caught keyboardinterrupt')
            exiting.set()
    print("Main thread finished (and thus all others)")


if __name__ == '__main__':
    loop_in_worker()
Run Code Online (Sandbox Code Playgroud)

它使用事件向线程发出信号,指示它们应该停止正在做的事情。在主循环中,有一个循环只是为了保持忙碌并检查是否有任何异常。请注意,此循环位于ThreadPoolExecutor.

作为奖励,它还通过使用相同的exiting事件来处理 SIGTERM 信号。

如果您在两者之间添加一个循环processes.append(process)process.wait()检查信号,那么它也可能会解决您的用例。这取决于您想要对正在运行的进程执行什么操作,您应该在那里采取哪些操作。

如果您从命令行运行我的脚本并按 ctrl-C 您应该看到类似以下内容:

started work
waiting
waiting
^CCaught keyboardinterrupt

   # some time passes here

finished work
Main thread finished (and thus all others)
Run Code Online (Sandbox Code Playgroud)

我的解决方案的灵感来自这篇博文