Dev*_*eda 7 python asynchronous future concurrent.futures pyspark
我正在尝试调用使用 pyspark rdd 对象方法的 python 函数,并且非常耗时,这会阻止我的应用程序。我需要以异步方式编写它,以便我的应用程序不会被阻止。这是我想做的实际事情的缩影版本。
from concurrent.futures import Future
from pyspark import SparkContext
sc = SparkContext()
def add(a, b):
f = Future()
c = a + b
d = a*b
t = (c,d)
rdd = sc.parallelize([t])
f.set_result(rdd)
# return rdd
if __name__ == '__main__':
f1 = add(90,8)
f2 = add(8, 89)
while (not f1.done()) and (not f2.done()):
pass
print(f1.result())
print(f2.result())
Run Code Online (Sandbox Code Playgroud)
我知道上面的代码不会立即起作用。我该如何修改它,以便它能够工作?
我认为你应该在你的函数中返回 f :
def add(a, b):
f = Future()
c = a + b
d = a*b
t = (c,d)
rdd = sc.parallelize([t])
f.set_result(rdd)
return f
Run Code Online (Sandbox Code Playgroud)
但不要忘记你的 rdd 是懒惰的。如果不采取任何行动,应该不会消耗那么多时间。
| 归档时间: |
|
| 查看次数: |
6714 次 |
| 最近记录: |