如何使用PySpark并行运行独立转换?

pre*_*jha 12 python-2.7 apache-spark apache-spark-sql python-multiprocessing pyspark

我试图运行2个函数,使用PySpark在一个RDD上进行完全独立的转换.有什么方法可以做同样的事情?

def doXTransforms(sampleRDD):
    (X transforms)

def doYTransforms(sampleRDD):
    (Y Transforms)

if __name__ == "__main__":
    sc = SparkContext(appName="parallelTransforms")
    sqlContext = SQLContext(sc)
    hive_context = HiveContext(sc)

    rows_rdd = hive_context.sql("select * from tables.X_table")

    p1 = Process(target=doXTransforms , args=(rows_rdd,))
    p1.start()
    p2 = Process(target=doYTransforms, args=(rows_rdd,))  
    p2.start()
    p1.join()
    p2.join()
    sc.stop()
Run Code Online (Sandbox Code Playgroud)

这不起作用,我现在明白这不起作用.但有没有其他方法可以使这项工作?特别是有任何python-spark特定解决方案吗?

zer*_*323 11

只需使用线程并确保集群有足够的资源同时处理这两个任务.

from threading import Thread
import time

def process(rdd, f):
    def delay(x):
        time.sleep(1)
        return f(x)
    return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2  = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()
Run Code Online (Sandbox Code Playgroud)

可以说这在实践中并不常用,但在其他方面应该可以正常工作.

您可以进一步使用调度FAIR程序和调度程序池的应用程序内调度,以更好地控制执行策略.

您也可以尝试pyspark-asyncactions(免责声明 - 本答案的作者也是该软件包的作者),它提供了一组围绕Spark API的包装器,并且concurrent.futures:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]
Run Code Online (Sandbox Code Playgroud)