Why does a PySpark UDF that operates on a column generated by rand() fail?

gmd*_*mds 6 python apache-spark pyspark

Given the following Python function:

def f(col):
    return col
Run Code Online (Sandbox Code Playgroud)

If I turn it into a UDF and apply it to a column object, it works...

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

df = spark.range(10)
udf = F.udf(f, returnType=DoubleType()).asNondeterministic()

df.withColumn('new', udf(F.lit(0))).show()
Run Code Online (Sandbox Code Playgroud)

...Except if the column is generated by rand:

df.withColumn('new', udf(F.rand())).show()  # fails
Run Code Online (Sandbox Code Playgroud)

However, the following two work:

df.withColumn('new', F.rand()).show()
df.withColumn('new', F.rand()).withColumn('new2', udf(F.col('new'))).show()
Run Code Online (Sandbox Code Playgroud)

The error:

Py4JJavaError: An error occurred while calling o469.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 34, localhost, executor driver): java.lang.NullPointerException
Run Code Online (Sandbox Code Playgroud)

Why does this happen, and how can I use a rand column expression created within a UDF?

rlu*_*uta 12

核心问题是在JVM上侧的rand()函数依赖于不下去加上一个序列化/反序列化瞬态RNG变量eval即空不安全(在RDG类和子类兰德定义执行这里)。据我所知,rand()并且randn()是Spark中具有这些特定属性的唯一函数

当您编写udf(F.rand())spark 时,将其评估为单个PythonUDF表达式,从而在command_pickle中序列化rand()调用,从而丢失了初始化的瞬态。这可以从执行计划中观察到:

df.withColumn('new', udf(F.rand())).explain()

== Physical Plan ==
*(2) Project [id#0L, pythonUDF0#95 AS new#92]
+- BatchEvalPython [f(rand(-6878806567622466209))], [id#0L, pythonUDF0#95]
   +- *(1) Range (0, 10, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)

不幸的是,如果不做任何修复以使Rand类为null安全,就不可能克服这个问题,但是,如果您只需要生成随机数,则可以围绕Python随机生成器轻松构建自己的rand()udf:

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from random import random

def f(col):
    return col

df = spark.range(10)
udf = F.udf(f, returnType=DoubleType()).asNondeterministic()
rand = F.udf(random, returnType=DoubleType()).asNondeterministic()

df.withColumn('new', udf(rand())).show()

+---+-------------------+
| id|                new|
+---+-------------------+
|  0| 0.4384090392727712|
|  1| 0.5827392568376621|
|  2| 0.4249312702725516|
|  3| 0.8423409231783007|
|  4|0.39533981334524604|
|  5| 0.7073194901736066|
|  6|0.19176164335919255|
|  7| 0.7296698171715453|
|  8|  0.799510901886918|
|  9|0.12662129139761658|
+---+-------------------+
Run Code Online (Sandbox Code Playgroud)