TCr*_*net 6 python user-defined-functions python-decorators apache-spark-sql pyspark
我实际上正在尝试定义 UDF,其中包含累加器。累加器用于保存my_function中的异常以供以后使用。我提出了带有一些参数(returnType、accumulator)的 udf 定义。我想让它更具可读性和可重用性。如何使用下面的代码定义装饰器函数?
from pyspark.sql import functions as F
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.types import StringType, StructField, IntegerType, StructType
from pyspark.sql import Row
data = [
Row(word="foo", number=7),
Row(word="bar", number=13)]
schema = StructType([
StructField("word", StringType(), True),
StructField("number", IntegerType(), True)])
df = spark.createDataFrame(data, schema)
Run Code Online (Sandbox Code Playgroud)
class ListParam(AccumulatorParam):
def zero(self, v):
return []
def addInPlace(self, variable, value):
variable.append(value)
return variable
accum = spark.sparkContext.accumulator([], ListParam())
Run Code Online (Sandbox Code Playgroud)
def accumulator_udf(accumulator, returnType):
def my_function(x):
y = None
try:
y = (x / (x - 7))
except Exception as e:
accumulator.add(dict([('errorType', str(e)), ('Data', x)]))
return y
return F.udf(my_function, returnType)
my_udf = accumulator_udf(accumulator=element_div_acc, returnType=IntegerType())
Run Code Online (Sandbox Code Playgroud)
df.select(my_udf(df.number)).show()
+---------------+
|div_one(number)|
+---------------+
| null|
| 2|
+---------------+
print(accum.value)
> [[{'errorType': 'integer division or modulo by zero', 'Data': 7}], []]
Run Code Online (Sandbox Code Playgroud)
经过几次阅读后,我发现这篇文章(https://www.thecodeship.com/patterns/guide-to-python-function-decorators/)有帮助,但它被 NameError 困住了
def accumulator_udf(accumulator, returnType):
def func_wrapper(func):
return F.udf(func, returnType)
return func_wrapper
accum = spark.sparkContext.accumulator([], ListParam())
@accumulator_udf(accumulator=accum , returnType=IntegerType())
def my_function(x):
y = None
try:
y = (x / (x - 7))
except Exception as e:
accumulator.add(dict([('errorType', str(e)), ('Data', x)]))
return y
df.select(my_function(df.number)).show()
Run Code Online (Sandbox Code Playgroud)
当尝试这个实现时,我遇到以下错误:
NameError: global name 'accumulator' is not defined
Run Code Online (Sandbox Code Playgroud)
我怎样才能访问“accumulator”?
谢谢 !
| 归档时间: |
|
| 查看次数: |
2294 次 |
| 最近记录: |