she*_*idp 5 python threadpoolexecutor concurrent.futures
我有以下代码,它使用并发.futures.ThreadPoolExecutor 以计量方式启动另一个程序的进程(一次不超过 30 个)。我还希望能够在 ctrl-C python 进程时停止所有工作。这段代码的工作有一个警告:我必须按 ctrl-C 两次。我第一次发送 SIGINT 时,没有任何反应;第二次,我看到“向进程发送 SIGKILL”,进程死亡,但它起作用了。我的第一个 SIGINT 发生了什么?
execution_list = [['prog', 'arg1'], ['prog', 'arg2']] ... etc
processes = []
def launch_instance(args):
process = subprocess.Popen(args)
processes.append(process)
process.wait()
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
results = list(executor.map(launch_instance, execution_list))
except KeyboardInterrupt:
print('sending SIGKILL to processes')
for p in processes:
if p.poll() is None: #If process is still alive
p.send_signal(signal.SIGKILL)
Run Code Online (Sandbox Code Playgroud)
我在尝试解决类似问题时偶然发现了你的问题。不能 100% 确定它会解决您的用例(我没有使用子流程),但我认为它会。
executor只要作业仍在运行,您的代码就会保留在上下文管理器中。我有根据的猜测是,第一个 KeyboardInterrupt 将被 ThreadPoolExecutor 捕获,其默认行为是不启动任何新作业,等到当前作业完成,然后进行清理(并可能重新引发 KeyboardInterrupt)。但这些进程可能会长时间运行,因此您不会注意到。然后,第二个 KeyboardInterrupt 会中断此错误处理。
我如何解决我的问题(单独线程中的无限后台进程)是使用以下代码:
from concurrent.futures import ThreadPoolExecutor
import signal
import threading
from time import sleep
def loop_worker(exiting):
while not exiting.is_set():
try:
print("started work")
sleep(10)
print("finished work")
except KeyboardInterrupt:
print("caught keyboardinterrupt") # never caught here. just for demonstration purposes
def loop_in_worker():
exiting = threading.Event()
def signal_handler(signum, frame):
print("Setting exiting event")
exiting.set()
signal.signal(signal.SIGTERM, signal_handler)
with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(loop_worker, exiting)
try:
while not exiting.is_set():
sleep(1)
print('waiting')
except KeyboardInterrupt:
print('Caught keyboardinterrupt')
exiting.set()
print("Main thread finished (and thus all others)")
if __name__ == '__main__':
loop_in_worker()
Run Code Online (Sandbox Code Playgroud)
它使用事件向线程发出信号,指示它们应该停止正在做的事情。在主循环中,有一个循环只是为了保持忙碌并检查是否有任何异常。请注意,此循环位于ThreadPoolExecutor.
作为奖励,它还通过使用相同的exiting事件来处理 SIGTERM 信号。
如果您在两者之间添加一个循环processes.append(process)来process.wait()检查信号,那么它也可能会解决您的用例。这取决于您想要对正在运行的进程执行什么操作,您应该在那里采取哪些操作。
如果您从命令行运行我的脚本并按 ctrl-C 您应该看到类似以下内容:
started work
waiting
waiting
^CCaught keyboardinterrupt
# some time passes here
finished work
Main thread finished (and thus all others)
Run Code Online (Sandbox Code Playgroud)
我的解决方案的灵感来自这篇博文
| 归档时间: |
|
| 查看次数: |
5336 次 |
| 最近记录: |