如何在多列上编写Pyspark UDAF?

Hun*_*nle 4 apache-spark rdd apache-spark-sql pyspark

我在pyspark数据框中有以下数据end_stats_df:

values     start    end    cat1   cat2
10          1        2      A      B
11          1        2      C      B
12          1        2      D      B
510         1        2      D      C
550         1        2      C      B
500         1        2      A      B
80          1        3      A      B
Run Code Online (Sandbox Code Playgroud)

我想以下列方式聚合它:

  • 我想使用"开始"和"结束"列作为聚合键
  • 对于每组行,我需要执行以下操作:
    • 计算该组cat1cat2该组中唯一的值数.例如,对于start= 1和end= 2 的组,该数字将是4,因为存在A,B,C,D.该数字将被存储为n(在该示例中n = 4).
    • 对于该values字段,我需要对每个组进行排序values,然后选择每个n-1值,其中n是从上面第一个操作中存储的值.
    • 在聚合结束时,我并不关心上述操作中cat1cat2之后的内容.

上例中的示例输出是:

values     start    end    cat1   cat2
12          1        2      D      B
550         1        2      C      B
80          1        3      A      B
Run Code Online (Sandbox Code Playgroud)

如何使用pyspark数据帧完成?我假设我需要使用自定义UDAF,对吧?

Zha*_*ong 9

Pyspark不UDAF直接支持,因此我们必须手动进行聚合.

from pyspark.sql import functions as f

def func(values, cat1, cat2):
    n = len(set(cat1 + cat2))
    return sorted(values)[n - 2]


df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', sep='\t', header=True)
df = df.groupBy(df['start'], df['end']).agg(f.collect_list(df['values']).alias('values'),
                                            f.collect_set(df['cat1']).alias('cat1'),
                                            f.collect_set(df['cat2']).alias('cat2'))
df = df.select(df['start'], df['end'], f.UserDefinedFunction(func, StringType())(df['values'], df['cat1'], df['cat2']))
Run Code Online (Sandbox Code Playgroud)

  • PySpark 现在在 pandas 的帮助下支持 UDAF:请参阅[此答案](/sf/answers/3324847081/)。 (2认同)