如何在 pyspark 列表中找到最常见的元素?

Dro*_*ird 2 list frequency pyspark

我有一个 pyspark 数据框,有两列:ID 和 Elements。“元素”列中有列表元素。看起来像这样,

ID | Elements
_______________________________________
X  |[Element5, Element1, Element5]
Y  |[Element Unknown, Element Unknown, Element_Z]
Run Code Online (Sandbox Code Playgroud)

我想用“元素”列中最常见的元素形成一个列。输出应该看起来像,

ID | Elements                                           | Output_column 
__________________________________________________________________________
X  |[Element5, Element1, Element5]                      | Element5
Y  |[Element Unknown, Element Unknown, Element_Z]       | Element Unknown 
Run Code Online (Sandbox Code Playgroud)

我怎样才能使用 pyspark 做到这一点?

提前致谢。

ank*_*_91 6

我们可以在这里使用高阶函数(spark 2.4+ 提供)

  1. 首先使用transformaggregate来获取数组中每个不同值的计数。
  2. 然后对结构体数组进行降序排序,然后获取第一个元素。

from pyspark.sql import functions as F
temp = (df.withColumn("Dist",F.array_distinct("Elements"))
              .withColumn("Counts",F.expr("""transform(Dist,x->
                           aggregate(Elements,0,(acc,y)-> IF (y=x, acc+1,acc))
                                      )"""))
              .withColumn("Map",F.arrays_zip("Dist","Counts")
              )).drop("Dist","Counts")
out = temp.withColumn("Output_column",
                    F.expr("""element_at(array_sort(Map,(first,second)->
         CASE WHEN first['Counts']>second['Counts'] THEN -1 ELSE 1 END),1)['Dist']"""))
Run Code Online (Sandbox Code Playgroud)

输出:

请注意,我为 ID z 添加了一个空白数组以进行测试。您也可以Map通过添加.drop("Map")到输出来删除该列

out.show(truncate=False)

+---+---------------------------------------------+--------------------------------------+---------------+
|ID |Elements                                     |Map                                   |Output_column  |
+---+---------------------------------------------+--------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |[{Element5, 2}, {Element1, 1}]        |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|[{Element Unknown, 2}, {Element_Z, 1}]|Element Unknown|
|Z  |[]                                           |[]                                    |null           |
+---+---------------------------------------------+--------------------------------------+---------------+
Run Code Online (Sandbox Code Playgroud)

对于较低版本,可以使用带有统计模式的udf:

from pyspark.sql import functions as F,types as T
from statistics import mode
u = F.udf(lambda x: mode(x) if len(x)>0 else None,T.StringType())

df.withColumn("Output",u("Elements")).show(truncate=False)
+---+---------------------------------------------+---------------+
|ID |Elements                                     |Output         |
+---+---------------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|Element Unknown|
|Z  |[]                                           |null           |
+---+---------------------------------------------+---------------+
Run Code Online (Sandbox Code Playgroud)