在Spark DataFrame中查找每个组的最大行数

Que*_*det 42 apache-spark apache-spark-sql pyspark

我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.

在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sasb.每个Row包含name,id_said_sb.我的目标是从生产映射id_said_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.

让我们试着用一个例子来澄清.如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)

我的目标是从生产映射a1b2.事实上,相关的名称a1n1,n2n3,分别映射b1,b2b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.

我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)

但也许我正在尝试使用错误的工具,我应该回到使用RDD.

zer*_*323 57

使用join(如果是关系,将导致组中的多个行):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
Run Code Online (Sandbox Code Playgroud)

使用窗口函数(将删除关系):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))
Run Code Online (Sandbox Code Playgroud)

使用struct订购:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))
Run Code Online (Sandbox Code Playgroud)

另请参见如何选择每个组的第一行?

  • 很好的答案,真的为我揭示了这些神秘的窗口功能 (5认同)
  • 你能解释一下结构的排序是如何工作的吗? (4认同)

alg*_*imo 9

我认为您可能正在寻找的是窗口函数:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight = window # pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

这是Scala中的一个示例(我现在没有可用的Hive Spark Shell,因此我无法测试代码,但我认为它应该可以工作):

case class MyRow(name: String, id_sa: String, id_sb: String)

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"),
    MyRow("n2", "a1", "b2"),
    MyRow("n3", "a1", "b2"),
    MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")
Run Code Online (Sandbox Code Playgroud)

使用Window函数可能有更有效的方法来实现相同的结果,但我希望这可以指向正确的方向.