我实际上正在尝试定义 UDF,其中包含累加器。累加器用于保存my_function中的异常以供以后使用。我提出了带有一些参数(returnType、accumulator)的 udf 定义。我想让它更具可读性和可重用性。如何使用下面的代码定义装饰器函数?
from pyspark.sql import functions as F
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.types import StringType, StructField, IntegerType, StructType
from pyspark.sql import Row
data = [
Row(word="foo", number=7),
Row(word="bar", number=13)]
schema = StructType([
StructField("word", StringType(), True),
StructField("number", IntegerType(), True)])
df = spark.createDataFrame(data, schema)
Run Code Online (Sandbox Code Playgroud)
class ListParam(AccumulatorParam):
def zero(self, v):
return []
def addInPlace(self, variable, value):
variable.append(value)
return variable
accum = spark.sparkContext.accumulator([], ListParam())
Run Code Online (Sandbox Code Playgroud)
def accumulator_udf(accumulator, returnType):
def my_function(x):
y = None
try:
y = (x / …Run Code Online (Sandbox Code Playgroud) python user-defined-functions python-decorators apache-spark-sql pyspark