pyspark udf 打印正在分析的行

Fra*_*Boi 3 python static-variables user-defined-functions python-3.x pyspark

我在 pyspark udf 函数中有问题,我想打印产生问题的行数。

我尝试使用 Python 中的“静态变量”等价物来计算行数,以便当用新行调用 udf 时,计数器会增加。但是,它不起作用:

import pyspark.sql.functions as F
def myF(input):
    myF.lineNumber += 1
    if (somethingBad):
        print(myF.lineNumber)
    return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())
Run Code Online (Sandbox Code Playgroud)

如何计算调用 udf 的次数,以便在 pyspark 中找到产生问题的行数?

小智 7

udfs 在工作人员处执行,因此其中的打印语句不会显示在输出中(来自驱动程序)。处理 UDF 问题的最佳方法是将 UDF 的返回类型更改为结构或列表,并将错误信息与返回的输出一起传递。在下面的代码中,我只是将错误信息添加到您最初返回的字符串 res 中。

import pyspark.sql.functions as F
def myF(input):
  myF.lineNumber += 1
  if (somethingBad):
    res += 'Error in line {}".format(myF.lineNumber)
  return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())
Run Code Online (Sandbox Code Playgroud)