我正在尝试调用使用 pyspark rdd 对象方法的 python 函数,并且非常耗时,这会阻止我的应用程序。我需要以异步方式编写它,以便我的应用程序不会被阻止。这是我想做的实际事情的缩影版本。
from concurrent.futures import Future
from pyspark import SparkContext
sc = SparkContext()
def add(a, b):
f = Future()
c = a + b
d = a*b
t = (c,d)
rdd = sc.parallelize([t])
f.set_result(rdd)
# return rdd
if __name__ == '__main__':
f1 = add(90,8)
f2 = add(8, 89)
while (not f1.done()) and (not f2.done()):
pass
print(f1.result())
print(f2.result())
Run Code Online (Sandbox Code Playgroud)
我知道上面的代码不会立即起作用。我该如何修改它,以便它能够工作?
在我的应用程序的代码库中集成pyspark时,我无法在RDD的map方法中引用类的方法.我用一个简单的例子重复了这个问题,如下所示
这是一个虚拟类,我已经定义了它只是为RDD的每个元素添加一个数字,RDD是一个类属性:
class Test:
def __init__(self):
self.sc = SparkContext()
a = [('a', 1), ('b', 2), ('c', 3)]
self.a_r = self.sc.parallelize(a)
def add(self, a, b):
return a + b
def test_func(self, b):
c_r = self.a_r.map(lambda l: (l[0], l[1] * 2))
v = c_r.map(lambda l: self.add(l[1], b))
v_c = v.collect()
return v_c
Run Code Online (Sandbox Code Playgroud)
test_func()map()在RDD上调用方法v,然后在add()每个元素上调用方法v.调用test_func()抛出以下错误:
pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, …Run Code Online (Sandbox Code Playgroud)