PySpark SQL 中的用户定义聚合函数

Rus*_*rdt 5 user-defined-functions pandas apache-spark apache-spark-sql pyspark

如何在 PySpark SQL 中实现用户定义聚合函数 (UDAF)?

pyspark version = 3.0.2
python version = 3.7.10
Run Code Online (Sandbox Code Playgroud)

作为一个最小的示例,我想用 UDAF 替换 AVG 聚合函数:

sc = SparkContext()
sql = SQLContext(sc)
df = sql.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
rv = sql.sql('SELECT id, AVG(value) FROM df GROUP BY id').toPandas()
Run Code Online (Sandbox Code Playgroud)

其中 rv 将是:

In [2]: rv
Out[2]:
   id  avg(value)
0   1         1.5
1   2         3.5
Run Code Online (Sandbox Code Playgroud)

UDAF 如何替换AVG查询中的内容?

例如,这不起作用

import numpy as np
def udf_avg(x):
    return np.mean(x)
sql.udf.register('udf_avg', udf_avg)
rv = sql.sql('SELECT id, udf_avg(value) FROM df GROUP BY id').toPandas()
Run Code Online (Sandbox Code Playgroud)

这个想法是用纯 Python 实现 UDAF,用于 SQL 聚合函数(例如低通滤波器)不支持的处理。

Rus*_*rdt 4

可以使用 Pandas UDF,其中定义与Spark 3.0和兼容Python 3.6+。有关详细信息,请参阅问题文档。

Spark SQL 中的完整实现:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')

@pandas_udf(DoubleType())
def avg_udf(s: pd.Series) -> float:
    return s.mean()
spark.udf.register('avg_udf', avg_udf)

rv = spark.sql('SELECT id, avg_udf(value) FROM df GROUP BY id').toPandas()
Run Code Online (Sandbox Code Playgroud)

有返回值

In [2]: rv
Out[2]:
   id  avg_udf(value)
0   1             1.5
1   2             3.5
Run Code Online (Sandbox Code Playgroud)