处理异常的Python线程池

xAp*_*ple 17 python multithreading exception-handling threadpool

我一直在寻找一个简单的python线程池模式的良好实现,真的找不到任何适合我的需求.我正在使用python 2.7,我发现的所有模块都不起作用,或者没有正确处理工作程序中的异常.我想知道是否有人知道可以提供我正在搜索的功能类型的库.非常感谢.

我的第一次尝试是使用内置multiprocessing模块,但由于这不使用线程而是使用子进程,而是遇到了无法对对象进行pickle的问题.不要去这里

from multiprocessing import Pool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = Pool(processes=8)
for s in samples: pool.apply_async(s.compute_fib, [20])
pool.join()
for s in samples: print s.fib

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
Run Code Online (Sandbox Code Playgroud)

期货

所以,我看到有一些的蟒蛇3.2凉爽的并发功能回到端口这里.这似乎完美且易于使用.问题是,当你在一个工人一个例外,你只能得到异常的类型,比如"ZeroDivisionError",但没有回溯,因此没有迹象表明其中线导致异常.代码变得无法调试.不行.

from concurrent import futures

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = futures.ThreadPoolExecutor(max_workers=8)
threads = [pool.submit(s.compute_fib, 20) for s in samples]
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in threads: t.result()
for s in samples: print s.fib


#    futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
#    354     def __get_result(self):
#    355         if self._exception:
#--> 356             raise self._exception
#    357         else:
#    358             return self._result
#
# ZeroDivisionError: integer division or modulo by zero
Run Code Online (Sandbox Code Playgroud)

工作者池

我在这里找到了这种模式的其他实现.这次发生异常时会打印出来,但是我的ipython交互式解释器处于挂起状态,需要从其他shell中杀死.不行.

import workerpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = workerpool.WorkerPool(size=8)
for s in samples: pool.map(s.compute_fib, [20])
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ^C^C^C^C^C^C^C^C^D^D
# $ kill 1783
Run Code Online (Sandbox Code Playgroud)

线程池

然而,另一执行这里.当发生异常时这一次,它被打印到stderr,但剧本不被中断,而是继续执行,这违背了例外的目的,可以使事情不安全.仍然无法使用.

import threadpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = threadpool.ThreadPool(8)
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
requests = [y for x in requests for y in x]
for r in requests: pool.putRequest(r)
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
#---> 17 for s in samples: print s.fib
#
#AttributeError: 'Sample' object has no attribute 'fib'
Run Code Online (Sandbox Code Playgroud)

- 更新 -

看来,关于futures库,python 3的行为与python 2不同.

futures_exceptions.py:

from concurrent.futures import ThreadPoolExecutor, as_completed

def div_zero(x):
    return x / 0

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = executor.map(div_zero, range(4))
    for future in as_completed(futures): print(future)
Run Code Online (Sandbox Code Playgroud)

Python 2.7.6输出:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 12, in <module>
    for future in as_completed(futures):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
    with _AcquireFutures(fs):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
    self.futures = sorted(futures, key=id)
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
    yield future.result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
    return self.__get_result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
    raise self._exception
ZeroDivisionError: integer division or modulo by zero
Run Code Online (Sandbox Code Playgroud)

Python 3.3.2输出:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 11, in <module>
    for future in as_completed(futures):
  File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
    with _AcquireFutures(fs):
  File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
    self.futures = sorted(futures, key=id)
  File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
    yield future.result()
  File "...python3.3/concurrent/futures/_base.py", line 392, in result
    return self.__get_result()
  File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
    raise self._exception
  File "...python3.3/concurrent/futures/thread.py", line 54, in run
    result = self.fn(*self.args, **self.kwargs)
  File "...futures_exceptions.py", line 7, in div_zero
    return x / 0
ZeroDivisionError: division by zero
Run Code Online (Sandbox Code Playgroud)

se7*_*7en 5

我个人使用concurrent.futures的是界面非常简单。对于回溯问题,我找到了保留它的解决方法。查看我对另一个问题的回答:

获取并发中异常的原始行号。futures