PySpark 代码运行在 JVM 还是 Python 子进程中?

Cha*_* Ju 3 python apache-spark pyspark

当我使用 python3 t1.py 运行以下名为 t1.py 的脚本时,我想了解幕后发生了什么。具体来说,我有以下问题:

  1. 向spark worker节点提交什么样的代码?它是提交给 spark 工作节点的 python 代码还是翻译后的等效 Java 代码?
  2. reduce 中的 add 操作是否被视为 UDF 并因此在工作节点上的 python 子进程中运行?
  3. 如果添加操作在工作节点上的 python 子进程中运行,工作 JVM 是否会为要添加的分区中的每个数字与 python 子进程通信?如果是这种情况,则意味着大量开销。
    #!/home/python3/venv/bin/python3
    #this file is named t1.py   
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql.types import DecimalType, IntegerType
    import pyspark.sql.functions as F
    from operator import add
    import pandas as pd
    from datetime import datetime

    len = int(100000000/1)
    print("len=", len)
    spark = SparkSession.builder.appName('ai_project').getOrCreate()

    start = datetime.now()
    t=spark.sparkContext.parallelize(range(len))
    a = t.reduce(add)
    print(a)
    end= datetime.now()
    print("end for spark rdd sum:", end, end-start)
Run Code Online (Sandbox Code Playgroud)

Hri*_*iev 7

在 PySpark 中,Python 和 JVM 代码位于不同的操作系统进程中。PySpark 使用 Py4J,这是一个促进两种语言之间互操作的框架,在 Python 和 JVM 进程之间交换数据。

当您启动 PySpark 作业时,它作为一个 Python 进程启动,然后生成一个 JVM 实例并在其中运行一些 PySpark 特定代码。然后它在那个 JVM 中实例化一个 Spark 会话,它成为 Spark 看到的驱动程序。该驱动程序连接到 Spark master 或产生一个 in-proc ,具体取决于会话的配置方式。

当您创建 RDD 或数据帧时,它们就像由 Scala 或 Java 应用程序创建的 RDD 和数据帧一样存储在 Spark 集群的内存中。对它们的转换和操作就像它们在 JVM 中一样工作,但有一个显着区别:任何涉及通过 Python 代码传递数据的东西,都在 JVM 之外运行。因此,如果您创建一个 Dataframe,并执行以下操作:

df.select("foo", "bar").where(df["foo"] > 100).count()
Run Code Online (Sandbox Code Playgroud)

这完全在 JVM 中运行,因为没有数据必须通过的 Python 代码。另一方面,如果你这样做:

a = t.reduce(add)
Run Code Online (Sandbox Code Playgroud)

由于add操作符是 Python 操作符,因此 RDD 被序列化,然后发送到一个或多个 Python 进程执行归约,然后结果再次序列化并返回到 JVM,最后传输到 Python 驱动程序进程最后减少。

它的工作方式(涵盖您的 Q1)是这样的:

  • 每个 Spark JVM 执行器都会生成一个运行特殊 PySpark 脚本的新 Python 子进程
  • Python 驱动程序序列化每个 Spark 任务(例如,add运算符)必须执行的字节码,并将其与一些附加数据一起腌制
  • JVM 执行器序列化其 RDD 分区,并将它们与从驱动程序接收的序列化 Python 字节码一起发送到其 Python 子进程
  • Python 代码在 RDD 数据上运行
  • 结果被序列化返回并发送到 JVM 执行器

JVM 执行器使用网络套接字与它们生成的 Python 子进程通信,它们启动的特殊 PySpark 脚本运行一个循环,其任务是坐在那里并期望运行序列化数据和字节码。

关于 Q3,JVM 执行程序将整个 RDD 分区传输到 Python 子进程,而不是单个项目。您应该努力使用 Pandas UDF,因为它们可以被矢量化。

如果您对详细信息感兴趣,请从源代码开始python/pyspark/rdd.py并查看RDD该类。

  • Python 中的“add”运算符是多态的。当你在 Scala 中执行类似“t.reduce(_ + _)”的操作时,编译器知道“+”的两个参数的类型,因为“t”本身是“RDD[Int]”,因此它会生成一个 lambda 函数这需要两个“Int”并对它们求和。在 Python 中,您必须以某种方式分析 RDD 并使用某种类型和运算符匹配来发出正确的 JVM 代码……我想这需要更多的努力,但没有什么实际好处。 (5认同)