Spark DataFrame/Dataset 为每个key找到最常见的值 高效方式

A.B*_*A.B 2 scala apache-spark apache-spark-sql apache-spark-dataset

问题: 我在映射 spark 中键的最常见值时遇到问题(使用 scala)。我已经用 RDD 完成了,但不知道如何有效地使用 DF/DS(sparksql)

数据集就像

key1 = value_a
key1 = value_b
key1 = value_b
key2 = value_a
key2 = value_c
key2 = value_c
key3 = value_a
Run Code Online (Sandbox Code Playgroud)

火花转换和访问输出后应该是每个键都有其共同的值

输出

key1 = valueb
key2 = valuec
key3 = valuea
Run Code Online (Sandbox Code Playgroud)

尝试到现在:

RDD

我试图(key,value),count在 RDD 中按组进行映射和减少,并且它产生逻辑,但我无法将其转换为 sparksql(DataFrame/Dataset)(因为我希望跨网络的最小洗牌)

这是我的 RDD 代码

 val data = List(

"key1,value_a",
"key1,value_b",
"key1,value_b",
"key2,value_a",
"key2,value_c",
"key2,value_c",
"key3,value_a"

)

val sparkConf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(sparkConf)

val lineRDD = sc.parallelize(data)

val pairedRDD = lineRDD.map { line =>
val fields = line.split(",")
(fields(0), fields(2))
}

val flatPairsRDD = pairedRDD.flatMap {
  (key, val) => ((key, val), 1)
}

val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b)




val resultsRDD = SumRDD.map{
  case ((key, val), count) => (key, (val,count))
 }.groupByKey.map{
  case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head)
}

resultsRDD.collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)

DataFrame ,使用窗口化:我正在尝试Window.partitionBy("key", "value")count over the window. 和 thnsortingagg()分别

Ram*_*jan 6

根据我从你的问题中了解到的,这是你可以做的

首先,您必须读取数据并将其转换为 dataframe

val df = sc.textFile("path to the data file")   //reading file line by line
  .map(line => line.split("="))                 // splitting each line by =
  .map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created
  .toDF("key", "value")                        //rdd converted to dataframe which required import sqlContext.implicits._
Run Code Online (Sandbox Code Playgroud)

这将是

+----+-------+
|key |value  |
+----+-------+
|key1|value_a|
|key1|value_b|
|key1|value_b|
|key2|value_a|
|key2|value_c|
|key2|value_c|
|key3|value_a|
+----+-------+
Run Code Online (Sandbox Code Playgroud)

下一步是计算每个键的相同值的重复次数,并为每个键选择重复次数最多的值,这可以通过使用Window函数来完成,aggregations如下所示

import org.apache.spark.sql.expressions._                   //import Window library
def windowSpec = Window.partitionBy("key", "value")         //defining a window frame for the aggregation
import org.apache.spark.sql.functions._                     //importing inbuilt functions
df.withColumn("count", count("value").over(windowSpec))     // counting repeatition of value for each group of key, value and assigning that value to new column called as count
  .orderBy($"count".desc)                                   // order dataframe with count in descending order
  .groupBy("key")                                           // group by key
  .agg(first("value").as("value"))                          //taking the first row of each key with count column as the highest
Run Code Online (Sandbox Code Playgroud)

因此最终输出应该等于

+----+-------+
|key |value  |
+----+-------+
|key3|value_a|
|key1|value_b|
|key2|value_c|
+----+-------+ 
Run Code Online (Sandbox Code Playgroud)