小编TCr*_*net的帖子

Python (pyspark) - 用于 UDF 定义的函数装饰器

我实际上正在尝试定义 UDF,其中包含累加器。累加器用于保存my_function中的异常以供以后使用。我提出了带有一些参数(returnType、accumulator)的 udf 定义。我想让它更具可读性和可重用性。如何使用下面的代码定义装饰器函数?

from pyspark.sql import functions as F
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.types import StringType, StructField, IntegerType, StructType
from pyspark.sql import Row

data = [
    Row(word="foo", number=7),
    Row(word="bar", number=13)]

schema = StructType([
    StructField("word", StringType(), True),
    StructField("number", IntegerType(), True)])

df = spark.createDataFrame(data, schema)
Run Code Online (Sandbox Code Playgroud)

创建我的自定义累加器

class ListParam(AccumulatorParam):
    def zero(self, v):
        return []

    def addInPlace(self, variable, value):
        variable.append(value)
        return variable

accum = spark.sparkContext.accumulator([], ListParam())
Run Code Online (Sandbox Code Playgroud)

我的udf的定义

def accumulator_udf(accumulator, returnType):
    def my_function(x):
        y = None
        try:
            y = (x / …
Run Code Online (Sandbox Code Playgroud)

python user-defined-functions python-decorators apache-spark-sql pyspark

6
推荐指数
0
解决办法
2294
查看次数