使用concurrent.futures异步调用pyspark函数

Dev*_*eda 7 python asynchronous future concurrent.futures pyspark

我正在尝试调用使用 pyspark rdd 对象方法的 python 函数,并且非常耗时,这会阻止我的应用程序。我需要以异步方式编写它,以便我的应用程序不会被阻止。这是我想做的实际事情的缩影版本。

from concurrent.futures import Future
from pyspark import SparkContext

sc = SparkContext()

def add(a, b):
    f = Future()
    c = a + b
    d = a*b
    t = (c,d)
    rdd = sc.parallelize([t])
    f.set_result(rdd)
    # return rdd

if __name__ == '__main__':

    f1 = add(90,8)
    f2 = add(8, 89)

    while (not f1.done()) and (not f2.done()):
        pass

    print(f1.result())
    print(f2.result())
Run Code Online (Sandbox Code Playgroud)

我知道上面的代码不会立即起作用。我该如何修改它,以便它能够工作?

Ste*_*ven 2

我认为你应该在你的函数中返回 f :

def add(a, b):
    f = Future()
    c = a + b
    d = a*b
    t = (c,d)
    rdd = sc.parallelize([t])
    f.set_result(rdd)
    return f
Run Code Online (Sandbox Code Playgroud)

但不要忘记你的 rdd 是懒惰的。如果不采取任何行动,应该不会消耗那么多时间。