如何在pyspark中进行并行处理

Amo*_*mol 3 apache-spark gcloud pyspark

我想使用 pyspark 在 for 循环中进行并行处理。

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

data = [a,b,c]


for i in data:
    try:
        df = spark.read.parquet('gs://'+i+'-data')
        df.createOrReplaceTempView("people")
        df2=spark.sql("""select * from people """)
        df.show()
    except Exception as e:
        print(e)
        continue
Run Code Online (Sandbox Code Playgroud)

上面提到的脚本工作正常,但我想在 pyspark 中进行并行处理,这在 scala 中是可能的

And*_*101 7

Spark 本身并行运行作业,但如果您仍然希望在代码中并行执行,您可以使用简单的 python 代码进行并行处理(这已在 DataBricks Only链接上进行了测试)。

data = ["a","b","c"]

from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)


def fun(x):
    try:
        df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
        df.show()
    except Exception as e:
        print(e)

pool.map( fun,data)
Run Code Online (Sandbox Code Playgroud)

我对您的代码进行了一些更改,但这基本上就是您运行并行任务的方式,如果您有一些想要并行运行的平面文件,只需创建一个包含其名称的列表并将其传递到pool.map( fun,data)

改变功能根据需要fun 。

有关多处理模块的更多详细信息,请查看文档

同样,如果您想在 Scala 中执行此操作,您将需要以下模块

import scala.concurrent.{Future, Await}
Run Code Online (Sandbox Code Playgroud)

如需更详细的了解,请查看内容。该代码适用于 Databricks,但经过一些更改,它将适用于您的环境。

  • 这是代码中的并行执行,而不是实际的并行执行。这是简单的 python 并行 Processign,它不会干扰 Spark 并行性。 (6认同)