我在使用Dask和Distributed开发数据分析管道方面取得了很大成功.然而,我仍然期待改进的一件事是我处理异常的方式.
现在,如果,我写下面的内容
def my_function (value):
return 1 / value
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
print(results.compute())
Run Code Online (Sandbox Code Playgroud)
...然后在运行程序时,我得到一个很长很长的追溯列表(每个工人一个,我猜).最相关的部分是
distributed.utils - ERROR - division by zero
Traceback (most recent call last):
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f
result[0] = yield gen.maybe_future(func(*args, **kwargs))
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in _get
result = yield self._gather(packed)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 923, in _gather
st.traceback)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/mnt/lustrefs/work/aurelien.mazurie/test_dask/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/dask/bag/core.py", line 1411, in reify
File "test.py", line 9, in my_function
return 1 / value
ZeroDivisionError: division by zero
Run Code Online (Sandbox Code Playgroud)
当然,在这里,视觉检查会告诉我错误是将数字除以零.我想知道的是,是否有更好的方法来跟踪这些错误.例如,我似乎无法捕获异常本身:
import dask.bag
import distributed
try:
dask_scheduler = "127.0.0.1:8786"
dask_client = distributed.Client(dask_scheduler)
def my_function (value):
return 1 / value
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
#dask_client.persist(results)
print(results.compute())
except Exception as e:
print("error: %s" % e)
Run Code Online (Sandbox Code Playgroud)
编辑:请注意,在我的示例中,我使用的是分布式,而不仅仅是dask.有一个dask-scheduler在端口8786听有四个dask-worker注册到它的过程.
这段代码将产生与上面完全相同的输出,这意味着我实际上并没有用我的try/ exceptblock来捕获异常.
现在,由于我们讨论的是跨群集的分布式任务,将异常传播回给我显然是非常重要的.有没有指导方针呢?现在我的解决方案是让函数返回结果和可选的错误消息,然后分别处理结果和错误消息:
def my_function (value):
try:
return {"result": 1 / value, "error": None}
except ZeroDivisionError:
return {"result": None, "error": "boom!"}
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
dask_client.persist(results)
errors = (results
.pluck("error")
.filter(lambda x: x is not None)
.compute())
print(errors)
results = (results
.pluck("result")
.filter(lambda x: x is not None)
.compute())
print(results)
Run Code Online (Sandbox Code Playgroud)
这是有效的,但我想知道我是否在这里对汤饼进行喷砂处理.编辑:另一个选择是使用像Maybemonad 这样的东西,但我想再次知道我是否过度思考它.
Dask 自动打包远程发生的异常并在本地重新引发它们。这是我运行你的示例时得到的结果
In [1]: from dask.distributed import Client
In [2]: client = Client('localhost:8786')
In [3]: import dask.bag
In [4]: try:
...: def my_function (value):
...: return 1 / value
...:
...: results = (dask.bag
...: .from_sequence(range(-10, 10))
...: .map(my_function))
...:
...: print(results.compute())
...:
...: except Exception as e:
...: import pdb; pdb.set_trace()
...: print("error: %s" % e)
...:
distributed.utils - ERROR - division by zero
> <ipython-input-4-17aa5fbfb732>(13)<module>()
-> print("error: %s" % e)
(Pdb) pp e
ZeroDivisionError('division by zero',)
Run Code Online (Sandbox Code Playgroud)