How to exit a script after ThreadPoolExecutor has timed out

Arn*_*sen 4 python-multithreading python-3.x

I'm trying to cleanly exit after some work times out because a thread is being blocked by something. I'm using a ThreadPoolExecutor like this:

try:
  with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
    # submit some work
    workers = [executor.submit(...) for x in work]
    # wait for completion
    try:
      for f in concurrent.futures.as_completed(workers, timeout=60):
        f.result()
    except concurrent.futures.TimeoutError:
      raise TimeoutError()
except TimeoutError:
  # cleanup
Run Code Online (Sandbox Code Playgroud)

This code gets to # cleanup no problem, but the script never exits because it's waiting for that blocked thread to finally finish. I do not yet know what is causing the worker to block forever, that's another issue to solve, but i need to have a way to at least exit when we run into this scenario.

I looked at how threads in ThreadPoolExecutor are created and they are being set to daemon = True so I'm doubly confused why these threads are blocking the application from exiting.

Nat*_*eks 6

奇怪的是,这是有意的行为。来自concurrent/futures/thread.py(版本 3.6.3):

# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
Run Code Online (Sandbox Code Playgroud)

“这个问题”正是您想要的行为 - 在工作线程仍在运行时退出。退出处理程序提到了join()对所有工作线程的调用,如果它们被卡住就会永远阻塞:

def _python_exit():
    global _shutdown
    _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    for t, q in items:
        t.join()

atexit.register(_python_exit)
Run Code Online (Sandbox Code Playgroud)

还有本身的__exit__方法TaskThreadExecutor

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False
Run Code Online (Sandbox Code Playgroud)

self.shutdown, 与wait=True, 也加入所有工作线程。

要强制退出,我们需要覆盖这两个。如果你修改你的代码如下:

except concurrent.futures.TimeoutError:
    import atexit
    atexit.unregister(concurrent.futures.thread._python_exit))
    executor.shutdown = lambda wait:None
    raise TimeoutError()
Run Code Online (Sandbox Code Playgroud)

那么您的脚本将根据需要退出。