Spark 非确定性 Pandas UDF 会出现什么问题

aig*_*fer 7 python pandas apache-spark apache-spark-sql pyspark

我正在编写一个需要为基于某些条件匹配的某些组生成 UUID 的过程。我让我的代码正常工作,但我担心在我的 UDF 中创建 UUID 的潜在问题(因此使其变得不确定)。下面是一些代码的简化示例来说明:

from uuid import uuid1

from pyspark.sql import SparkSession
from pyspark.sql.functions import PandasUDFType, pandas_udf

spark = (
    SparkSession.builder.master("local")
    .appName("Word Count")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)
df = spark.createDataFrame([["j", 3], ["h", 3], ["a", 2]], ["name", "age"])


@pandas_udf("name string, age integer, uuid string", PandasUDFType.GROUPED_MAP)
def create_uuid(df):
    df["uuid"] = str(uuid1())
    return df


>>> df.groupby("age").apply(create_uuid).show()
+----+---+--------------------+
|name|age|                uuid|
+----+---+--------------------+
|   j|  3|1f8f48ac-0da8-430...|
|   h|  3|1f8f48ac-0da8-430...|
|   a|  2|d5206d03-bcce-445...|
+----+---+--------------------+
Run Code Online (Sandbox Code Playgroud)

这目前适用于在 AWS Glue 上处理超过 20 万条记录的一些数据,我还没有发现任何错误。

我使用uuid1因为它使用节点信息来生成 UUID,从而确保没有 2 个节点生成相同的 id。

我的一个想法是将 UDF 注册为非确定性的:

udf = pandas_udf(
    create_uuid, "name string, age integer, uuid string", PandasUDFType.GROUPED_MAP
).asNondeterministic()
Run Code Online (Sandbox Code Playgroud)

但这给了我以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o60.flatMapGroupsInPandas.
: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
 `age`,create_uuid(name, age),`name`,`age`,`uuid`
in operator FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
               ;;
FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
+- Project [age#1L, name#0, age#1L]
   +- LogicalRDD [name#0, age#1L], false
Run Code Online (Sandbox Code Playgroud)

我的问题是:

  • 这可能会遇到哪些潜在问题?
  • 如果它确实存在潜在问题,那么我可以在哪些方面做出确定性决定?
  • 为什么不能将 GROUPED_MAP 函数标记为非确定性的?

Chr*_*ris 6

您的函数是非确定性的,但 Spark 将其视为确定性的,即“由于优化,可能会消除重复调用”。但是,对 的每次调用都pandas_udf将是唯一的输入(按键分组的行),因此pandas_udf不会触发对重复调用的优化。因此,asNondeterministic抑制这种优化的方法对于 a类型来说是多余pandas_udfGROUPED_MAP。在我看来,这解释了为什么GroupedData.apply函数没有被编码为接受pandas_udf标记为非确定性的。这是没有意义的,因为没有可以抑制的优化机会。