在Python的调用者线程中捕获线程的异常

Pha*_*nto 178 python multithreading exception-handling exception

我对Python和多线程编程很新.基本上,我有一个脚本,将文件复制到另一个位置.我希望将它放在另一个线程中,以便我可以输出....以指示脚本仍在运行.

我遇到的问题是,如果文件无法复制,它将引发异常.如果在主线程中运行,这是可以的; 但是,具有以下代码不起作用:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"
Run Code Online (Sandbox Code Playgroud)

在线程类本身,我试图重新抛出异常,但它不起作用.我看到这里的人问类似的问题,但他们似乎都在做一些比我想做的更具体的事情(而且我不太了解提供的解决方案).我见过人们提到使用它sys.exc_info(),但我不知道在哪里或如何使用它.

非常感谢所有帮助!

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise
Run Code Online (Sandbox Code Playgroud)

San*_*nta 106

问题是thread_obj.start()立即返回.您生成的子线程在其自己的上下文中执行,具有自己的堆栈.发生在那里的任何异常都存在于子线程的上下文中,并且它位于自己的堆栈中.我现在可以想到将这些信息传递给父线程的一种方法是使用某种消息传递,所以你可能会考虑这一点.

试试这个尺码:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

  • 为什么不加入线程而不是这个丑陋的while循环?请参阅`multiprocessing`等价物:https://gist.github.com/2311116 (5认同)
  • 为什么不使用基于 @Lasse 答案的 EventHook 模式 http://stackoverflow.com/questions/1092531/event-system-in-python/1094423#1094423 ?而不是循环的事情? (3认同)
  • 这对我来说似乎不安全。当线程在“bucket.get()”引发“Queue.Empty”之后立即引发异常时会发生什么?然后线程“join(0.1)”将完成,并且“isAlive() 为 False”,并且您会错过异常。 (3认同)
  • 在这种简单的情况下,`Queue` 是不必要的——你可以将异常信息存储为 `ExcThread` 的一个属性,只要你确保 `run()` 在异常发生后立即完成(它在这个简单的例子)。然后你只需在`t.join()`之后(或期间)重新引发异常。没有同步问题,因为`join()` 确保线程已经完成。请参阅下面的 Rok Strniša 的回答 /sf/answers/855648531/ (3认同)
  • 队列不是传达错误的最佳工具,除非您想拥有一个完整的队列。一个更好的构造是 threading.Event() (2认同)

Jon*_*ric 35

concurrent.futures模块使在单独的线程(或进程)中工作变得简单,并处理任何产生的异常:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()
Run Code Online (Sandbox Code Playgroud)

concurrent.futures包含在Python 3.2中,可用作早期版本的反向移植futures模块.

  • 虽然这并不完全符合OP的要求,但这正是我所需要的提示.谢谢. (4认同)
  • 这段代码阻塞了主线程。你如何异步执行此操作? (4认同)
  • 使用`concurrent.futures.as_completed`,您可以在引发异常时立即得到通知:/sf/ask/198053061/ python / 53779560#53779560 (2认同)

Mat*_*bos 27

虽然不可能直接捕获在不同线程中抛出的异常,但这里有一个代码可以非常透明地获得非常接近此功能的东西.您的子线程必须是子ExThread类而不是子类,threading.Thread并且父线程必须调用该child_thread.join_with_exception()方法,而不是child_thread.join()在等待线程完成其作业时.

这个实现的技术细节:当子线程抛出异常时,它通过a传递给Queue父线程并在父线程中再次抛出.请注意,这种方法没有繁忙的等待.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)


Art*_*are 21

这个问题有很多非常复杂的答案.我是否过度简化了这一点,因为这对我来说似乎已经足够了.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret
Run Code Online (Sandbox Code Playgroud)

如果你确定你只会在一个或另一个版本的Python上运行,那么你可以将run()方法减少到仅受损的版本(如果你只在3之前的Python版本上运行),或者只是干净的版本(如果你只在从3开始的Python版本上运行).

用法示例:

def f(*args, **kwargs)
    print(args)
    print(kwargs)
    raise Exception('I suck')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()
Run Code Online (Sandbox Code Playgroud)

当你加入时,你会看到另一个线程上引发的异常.

  • 为了取代线程,连接应该包含一个超时参数。`def join(self, timeout=None): super(PropagatingThread, self).join(timeout) if self.exc: raise self.exc return self.ret` (4认同)
  • 我建议 run 方法采用这个更简单的实现:`super(PropagatingThread, self).run()` (3认同)
  • 我也不确定为什么这个答案不更受欢迎。这里还有其他人也进行简单的传播,但需要扩展类并覆盖。这正是许多人所期望的,并且只需要从 Thread 更改为 ProagatingThread。还有 4 个空格选项卡,所以我的复制/粘贴很简单:-) ...我建议的唯一改进是使用 Six.raise_from() ,以便您获得一组漂亮的嵌套堆栈跟踪,而不仅仅是堆栈再加注的地点。 (2认同)
  • 我的问题是我有多个子线程。连接按顺序执行,稍后连接的线程可能会引发异常。有一个简单的方法可以解决我的问题吗?同时运行连接? (2认同)
  • 如果使用 daemon=True 调用线程,它似乎不起作用。当线程中的代码失败时,我需要在线程中发出一条消息(使用 Flask socketio)。有办法处理吗? (2认同)
  • @SimonNdunda run() 不返回值,因此仅适用于不返回任何内容的函数。 (2认同)

Rok*_*iša 19

如果线程中发生异常,最好的方法是在调用程序线程中重新引发它join.您可以使用该sys.exc_info()函数获取有关当前正在处理的异常的信息.这个信息可以简单地存储为线程对象的属性,直到join被调用,此时可以重新引发它.

请注意,Queue.Queue在这种简单的情况下,(在其他答案中建议)不需要线程,其中线程抛出最多1个异常在抛出异常后立即完成.我们通过简单地等待线程完成来避免竞争条件.

例如,扩展ExcThread(下面),覆盖excRun(而不是run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]
Run Code Online (Sandbox Code Playgroud)

Python 3.x:

3参数形式raise在Python 3中消失了,所以将最后一行更改为:

raise new_exc.with_traceback(self.exc[2])
Run Code Online (Sandbox Code Playgroud)

  • 为什么使用 threading.Thread.join(self) 而不是 super(ExcThread, self).join()? (2认同)
  • [文档](https://docs.python.org/3/library/threading.html#thread-objects) 明确指出只有 `threading.Thread` 的 `__init__` 和 `run` 方法应该被重写,尽管他们没有解释为什么建议这样做。 (2认同)

ceb*_*bor 15

我正在做的是,简单地重写线程的 join 和 run 方法:

class RaisingThread(threading.Thread):
  def run(self):
    self._exc = None
    try:
      super().run()
    except Exception as e:
      self._exc = e

  def join(self, timeout=None):
    super().join(timeout=timeout)
    if self._exc:
      raise self._exc
Run Code Online (Sandbox Code Playgroud)

使用如下:

def foo():
  time.sleep(2)
  print('hi, from foo!')
  raise Exception('exception from foo')    

t = RaisingThread(target=foo)
t.start()
try:
  t.join()
except Exception as e:
  print(e)
Run Code Online (Sandbox Code Playgroud)

结果:

hi, from foo!
exception from foo!
Run Code Online (Sandbox Code Playgroud)

  • 太棒了!我在这里阅读了大部分答案,但直到我在尝试了各种方法(包括 threading.excepthook)之后想出了一个可行的解决方案时才注意到这个答案。我最终得到的结果与这个解决方案几乎相同:)它值得一些赞成票——就像梦想一样。 (2认同)

Cir*_*四事件 13

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

以下解决方案:

  • 调用异常时立即返回主线程
  • 不需要额外的用户定义类,因为它不需要:
    • 明确的 Queue
    • 在您的工作线程周围添加一个除了 else

来源:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)
    
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))
Run Code Online (Sandbox Code Playgroud)

可能的输出:

0
0
1
1
2
2
0
Exception()
1
2
None
Run Code Online (Sandbox Code Playgroud)

不幸的是,不可能杀死期货来取消其他期货,因为其中一个失败了:

如果您执行以下操作:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()
Run Code Online (Sandbox Code Playgroud)

然后with捕获它,并在继续之前等待第二个线程完成。以下行为类似:

for future in concurrent.futures.as_completed(futures):
    future.result()
Run Code Online (Sandbox Code Playgroud)

因为future.result()如果发生异常,则会重新引发异常。

如果您想退出整个 Python 进程,您可以使用os._exit(0),但这可能意味着您需要重构。

具有完美异常语义的自定义类

我最终为自己编写了完美的界面:限制一次运行的最大线程数的正确方法?“带有错误处理的队列示例”部分。该课程旨在既方便又让您完全控制提交和结果/错误处理。

在 Python 3.6.7、Ubuntu 18.04 上测试。


Zho*_*gbo 9

在 Python 3.8 中,我们可以使用threading.excepthook来钩住所有子线程中未捕获的异常!例如,

threading.excepthook = thread_exception_handler
Run Code Online (Sandbox Code Playgroud)

推荐人:https : //stackoverflow.com/a/60002752/5093308

  • 这没有帮助,因为处理程序仍然在子线程而不是主线程中运行 (4认同)

Cal*_*dge 6

这是一个令人讨厌的小问题,我想提出我的解决方案。我发现的其他一些解决方案(例如 async.io)看起来很有希望,但也呈现出一些黑匣子。队列/事件循环方法将您与某个实现联系起来。但是并发期货源代码只有 1000 行左右,易于理解。它让我可以轻松解决我的问题:无需太多设置即可创建临时工作线程,并且能够在主线程中捕获异常。

我的解决方案使用并发期货 API 和线程 API。它允许您创建一个工人,为您提供线程和未来。这样就可以加入线程等待结果了:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())
Run Code Online (Sandbox Code Playgroud)

...或者您可以让工作人员在完成后发送回调:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))
Run Code Online (Sandbox Code Playgroud)

...或者你可以循环直到事件完成:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)
Run Code Online (Sandbox Code Playgroud)

这是代码:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())
Run Code Online (Sandbox Code Playgroud)

...和测试功能:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')
Run Code Online (Sandbox Code Playgroud)


小智 5

我知道我来晚了一点,但我遇到了一个非常相似的问题,但它包括使用 tkinter 作为 GUI,并且主循环使得无法使用任何依赖于 .join() 的解决方案。因此,我调整了原始问题的编辑中给出的解决方案,但使其更加通用,以便其他人更容易理解。

这是正在运行的新线程类:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()
Run Code Online (Sandbox Code Playgroud)

当然,您始终可以让它通过日志记录以其他方式处理异常,例如将其打印出来,或将其输出到控制台。

这允许您像使用 Thread 类一样使用 ExceptionThread 类,无需任何特殊修改。