use*_*842 5 python apache-spark pyspark
我一直在使用这个在线jupyter笔记本https://tmpnb.org/上的Spark和Python,并尝试了3种传递python函数的方法:
1)使用地图
import numpy as np
def my_sqrt(x):
return np.sqrt(x)
sc.parallelize(range(10)).map(my_sqrt).collect()
Run Code Online (Sandbox Code Playgroud)
2)并行化my_sqrt并调用它
sc.parallelize([(my_sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect()
Run Code Online (Sandbox Code Playgroud)
3)并行化np.sqrt并调用它
sc.parallelize([(np.sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect()
Run Code Online (Sandbox Code Playgroud)
(1)和(3)做工作,(2)没有.首先,我想了解为什么/如何(1)和(3)工作.其次,我想了解为什么(2)没有,以及可以做些什么来使它工作.
第一种方法是有效的,因为Spark使用特殊的序列化策略来处理转换所需的闭包,这种闭包明显慢于标准pickle(但我们无法使用.map(lambda x: ...)).
最后一种方法有效,因为根本不需要序列化功能代码.它sqrt从numpy模块引用,只要每个工作者都可以访问NumPy就没有问题.
第二种方法不起作用,因为酸洗不会序列化代码.
import pickle
pickle.dumps(my_sqrt)
## b'\x80\x03c__main__\nmy_sqrt\nq\x00.'
Run Code Online (Sandbox Code Playgroud)
所有这一切都说明请给我一个my_sqrt从顶级脚本环境(又名)分配给(my_sqrt.__name__)的对象.当它在worker上执行时,它不使用相同的环境,并且范围中不再有这样的对象,因此是例外.要清楚它既不是一个bug,也不是特定于Spark的东西.您可以在本地轻松地重现相同的行为,如下所示:__main__
In [1]: import pickle
In [2]: def foo(): ...
In [3]: foo_ = pickle.dumps(foo)
In [4]: pickle.loads(foo_)
Out[4]: <function __main__.foo>
In [5]: del foo
In [6]: pickle.loads(foo_)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
...
AttributeError: Can't get attribute 'foo' on <module '__main__'>
Run Code Online (Sandbox Code Playgroud)
由于它不涉及实际值,您甚至可以像这样重新分配:
In [7]: foo = "foo"
In [8]: pickle.loads(foo_)
Out[8]: 'foo'
Run Code Online (Sandbox Code Playgroud)
这里带走的消息是,如果你想使用一个函数,就这样把它放在一个单独的模块中,并像在其他依赖项中一样在工作者之间分发它,包括自定义类定义.
| 归档时间: |
|
| 查看次数: |
902 次 |
| 最近记录: |