小编Dev*_*eda的帖子

使用concurrent.futures异步调用pyspark函数

我正在尝试调用使用 pyspark rdd 对象方法的 python 函数,并且非常耗时,这会阻止我的应用程序。我需要以异步方式编写它,以便我的应用程序不会被阻止。这是我想做的实际事情的缩影版本。

from concurrent.futures import Future
from pyspark import SparkContext

sc = SparkContext()

def add(a, b):
    f = Future()
    c = a + b
    d = a*b
    t = (c,d)
    rdd = sc.parallelize([t])
    f.set_result(rdd)
    # return rdd

if __name__ == '__main__':

    f1 = add(90,8)
    f2 = add(8, 89)

    while (not f1.done()) and (not f2.done()):
        pass

    print(f1.result())
    print(f2.result())
Run Code Online (Sandbox Code Playgroud)

我知道上面的代码不会立即起作用。我该如何修改它,以便它能够工作?

python asynchronous future concurrent.futures pyspark

7
推荐指数
1
解决办法
6714
查看次数

无法从pyspark RDD的map方法访问类方法

在我的应用程序的代码库中集成pyspark时,我无法在RDD的map方法中引用类的方法.我用一个简单的例子重复了这个问题,如下所示

这是一个虚拟类,我已经定义了它只是为RDD的每个元素添加一个数字,RDD是一个类属性:

class Test:

    def __init__(self):
        self.sc = SparkContext()
        a = [('a', 1), ('b', 2), ('c', 3)]
        self.a_r = self.sc.parallelize(a)

    def add(self, a, b):
        return a + b

    def test_func(self, b):
        c_r = self.a_r.map(lambda l: (l[0], l[1] * 2))
        v = c_r.map(lambda l: self.add(l[1], b))
        v_c = v.collect()
        return v_c
Run Code Online (Sandbox Code Playgroud)

test_func()map()在RDD上调用方法v,然后在add()每个元素上调用方法v.调用test_func()抛出以下错误:

pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, …
Run Code Online (Sandbox Code Playgroud)

python rdd pyspark

2
推荐指数
1
解决办法
1206
查看次数

标签 统计

pyspark ×2

python ×2

asynchronous ×1

concurrent.futures ×1

future ×1

rdd ×1