在Spark Dataframe中,如何在两个数据框中获取重复记录和不同记录?

She*_*har 6 scala apache-spark

我正在研究一个问题,我将hive表中的数据加载到spark数据帧中,现在我希望1个数据帧中的所有唯一accts和另一个中的所有重复项.例如,如果我有acct id 1,1,2,3,4.我希望在一个数据帧中获得2,3,4,在另一个数据帧中获得1,1.我怎样才能做到这一点?

Aks*_*edi 9

df.groupBy($"field1",$"field2"...).count.filter($"count" > 1).show()
Run Code Online (Sandbox Code Playgroud)


val*_*ner 8

根据您拥有的spark版本,您可以在数据集/ sql中使用窗口函数,如下所示:

Dataset<Row> New = df.withColumn("Duplicate", count("*").over( Window.partitionBy("id") ) );

Dataset<Row> Dups = New.filter(col("Duplicate").gt(1));

Dataset<Row> Uniques = New.filter(col("Duplicate").equalTo(1));
Run Code Online (Sandbox Code Playgroud)

以上是用java编写的。在scala中应该类似,并在python中阅读如何做。 https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html


Kir*_*anM 6

val acctDF = List(("1", "Acc1"), ("1", "Acc1"), ("1", "Acc1"), ("2", "Acc2"), ("2", "Acc2"), ("3", "Acc3")).toDF("AcctId", "Details")
scala> acctDF.show()
+------+-------+
|AcctId|Details|
+------+-------+
|     1|   Acc1|
|     1|   Acc1|
|     1|   Acc1|
|     2|   Acc2|
|     2|   Acc2|
|     3|   Acc3|
+------+-------+

val countsDF = acctDF.map(rec => (rec(0), 1)).reduceByKey(_+_).map(rec=> (rec._1.toString, rec._2)).toDF("AcctId", "AcctCount")

val accJoinedDF = acctDF.join(countsDF, acctDF("AcctId")===countsDF("AcctId"), "left_outer").select(acctDF("AcctId"), acctDF("Details"), countsDF("AcctCount"))

scala> accJoinedDF.show()
+------+-------+---------+   
|AcctId|Details|AcctCount|
+------+-------+---------+
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     2|   Acc2|        2|
|     2|   Acc2|        2|
|     3|   Acc3|        1|
+------+-------+---------+


val distAcctDF = accJoinedDF.filter($"AcctCount"===1)
scala> distAcctDF.show()
+------+-------+---------+   
|AcctId|Details|AcctCount|
+------+-------+---------+
|     3|   Acc3|        1|
+------+-------+---------+

val duplAcctDF = accJoinedDF.filter($"AcctCount">1)
scala> duplAcctDF.show()
+------+-------+---------+                 
|AcctId|Details|AcctCount|
+------+-------+---------+
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     2|   Acc2|        2|
|     2|   Acc2|        2|
+------+-------+---------+

(OR scala> duplAcctDF.distinct.show() )
Run Code Online (Sandbox Code Playgroud)

  • 如果我错了,但您无法在数据帧上使用 map 或 reduceByKey 进行操作,请纠正我,您需要先将 ir 转换为 rdd,最后再转换回 DF。像这样:`val countsDF = acctDF.rdd.map....toDF()` (2认同)