使用UDF处理多个列时堆栈溢出

Alb*_*nto 4 python user-defined-functions apache-spark apache-spark-sql pyspark

我有一个类型DataFrame很多的列str,我想将一个函数应用于所有这些列,而不重命名它们或添加更多列,我尝试使用for-in循环执行withColumn(参见下面的示例),但通常在我运行代码时,它显示了Stack Overflow(它很少有效),这DataFrame根本不大,它只有~15000条记录.

# df is a DataFrame
def lowerCase(string):
    return string.strip().lower()

lowerCaseUDF = udf(lowerCase, StringType())

for (columnName, kind) in df.dtypes:
    if(kind == "string"):
        df = df.withColumn(columnName, lowerCaseUDF(df[columnName]))

df.select("Tipo_unidad").distinct().show()
Run Code Online (Sandbox Code Playgroud)

完整的错误很长,因此我决定只粘贴一些行.但是你可以在这里找到完整的跟踪

Py4JJavaError:调用o516.showString时发生错误.:org.apache.spark.SparkException:作业因阶段失败而中止:阶段2.0中的任务1失败4次,最近失败:阶段2.0中丢失的任务1.3(TID 38,worker2.mcbo.mood.com.ve): java.io.ObjectInputStream中的java.lang.StackOverflowError $ BlockDataInputStream.readByte(ObjectInputStream.java:2774)

我认为这个问题产生的原因是这个代码启动了许多工作(每个类型一个string),你能告诉我另一个选择或我做错了吗?

zer*_*323 11

尝试这样的事情:

from pyspark.sql.functions import col, lower, trim

exprs = [
    lower(trim(col(c))).alias(c) if t == "string" else col(c) 
    for (c, t) in df.dtypes
]

df.select(*exprs)
Run Code Online (Sandbox Code Playgroud)

与当前解决方案相比,此方法有两个主要优势:

  • 它只需要单个投影(没有增长的谱系,最有可能负责SO),而不是每个字符串列的投影.
  • 它只直接操作内部表示而不将数据传递给Python(BatchPythonProcessing).

  • 工作完美,但是如果我必须在每个字符串列中应用一个非常复杂的函数,我该怎么办 (2认同)
  • 嗯,几乎一样的方式:)如果你不能使用表达式(在1.6 Spark它不应该是一个问题 - 有足够的选择,所以你可以创建任意复杂的转换)只需用UDF替换`lower∘trim`. (2认同)