查找 Spark 数据框中多个字段的前 n 个结果

Mar*_*tin 4 apache-spark apache-spark-sql pyspark

我有一个像这样的数据框:

name  field1  field2  field3
a     4       10      8 
b     5       0       11
c     10      7       4
d     0       1       5
Run Code Online (Sandbox Code Playgroud)

我需要找到每个字段的前 3 个名称。

预期输出:

top3-field1  top3-field2  top3-field3
c            a            b
b            c            a
a            d            d
Run Code Online (Sandbox Code Playgroud)

因此,我尝试对 field(n) 列值进行排序,限制前 3 个结果并使用withColumn方法生成新列,如下所示:

df1 = df.orderBy(f.col("field1").desc(), "name") \
.limit(3) \
.withColumn("top3-field1", df["name"]) \
.select("top3-field1", "field1")
Run Code Online (Sandbox Code Playgroud)

使用这种方法,我必须为每个字段(n)创建不同的数据帧,然后将它们连接起来以获得如上所述的结果。我觉得这个问题必须有更好的解决方案。希望有人能给我建议

ank*_*_91 6

您可以先堆叠 df,然后按降序排列排名,然后过滤掉小于或等于 3 的排名,最后对名称进行透视:

请注意,我在代码中使用此函数是为了使堆叠本身的输入变得更容易:


from pyspark.sql import functions as F, Window as W #imports

w = W.partitionBy("col").orderBy(F.desc("values"))
out = (df.selectExpr("name",stack_multiple_col(df,df.columns[1:]))
         .withColumn("Rnk",F.dense_rank().over(w))
         .where("Rnk<=3").groupBy("Rnk").pivot("col").agg(F.first("name")))

out.show()

+---+------+------+------+
|Rnk|field1|field2|field3|
+---+------+------+------+
|  1|     c|     a|     b|
|  2|     b|     c|     a|
|  3|     a|     d|     d|
+---+------+------+------+
Run Code Online (Sandbox Code Playgroud)

如果你不愿意使用该功能,你可以写成:

w = W.partitionBy("col").orderBy(F.desc("values"))
out = (df.selectExpr("name",
'stack(3,"field1",field1,"field2",field2,"field3",field3) as (col,values)')
 .withColumn("Rnk",F.dense_rank().over(w))
.where("Rnk<=3").groupBy("Rnk").pivot("col").agg(F.first("name")))
Run Code Online (Sandbox Code Playgroud)

完整代码:

def stack_multiple_col(df,cols=df.columns,output_columns=["col","values"]):
  """stacks multiple columns in a dataframe, 
     takes all columns by default unless passed a list of values"""
  return (f"""stack({len(cols)},{','.join(map(','.join,
         (zip([f'"{i}"' for i in cols],cols))))}) as ({','.join(output_columns)})""")


w = W.partitionBy("col").orderBy(F.desc("values"))
out = (df.selectExpr("name",stack_multiple_col(df,df.columns[1:]))
         .withColumn("Rnk",F.dense_rank().over(w))
      .where("Rnk<=3").groupBy("Rnk").pivot("col").agg(F.first("name")))

out.show()
Run Code Online (Sandbox Code Playgroud)