Scala spark通过键减少并找到共同的价值

VSE*_*GHP 4 hadoop scala apache-spark

我有一个csv数据文件存储在HDFS上的sequenceFile中,格式为name, zip, country, fav_food1, fav_food2, fav_food3, fav_colour.可能有许多具有相同名称的条目,我需要找出他们最喜欢的食物是什么(即计算所有具有该名称的记录中的所有食物条目并返回最受欢迎的食物.我是Scala和Spark的新手并拥有彻底的多个教程和搜索论坛,但我仍然坚持如何继续.到目前为止,我已经得到了文本到字符串格式的序列文件,然后过滤了条目

以下是文件中一行的示例数据条目

Bob,123,USA,Pizza,Soda,,Blue
Bob,456,UK,Chocolate,Cheese,Soda,Green
Bob,12,USA,Chocolate,Pizza,Soda,Yellow
Mary,68,USA,Chips,Pasta,Chocolate,Blue
Run Code Online (Sandbox Code Playgroud)

所以输出应该是元组(Bob,Soda),因为苏打在Bob的条目中出现次数最多.

import org.apache.hadoop.io._

var lines  = sc.sequenceFile("path",classOf[LongWritable],classOf[Text]).values.map(x => x.toString())
// converted to string since I could not get filter to run on Text and removing the longwritable

var filtered = lines.filter(_.split(",")(0) == "Bob");
// removed entries with all other users

var f_tuples = filtered.map(line => lines.split(",");
// split all the values

var f_simple = filtered.map(line => (line(0), (line(3), line(4), line(5))
// removed unnecessary fields
Run Code Online (Sandbox Code Playgroud)

我现在的问题是,我认为我有这种[<name,[f,f,f]>]结构,并不知道如何进行压扁并获得最受欢迎的食物.我需要组合所有条目,所以我有一个带a的条目,然后获取值中最常见的元素.任何帮助,将不胜感激.谢谢

我试过这个让它变得扁平化,但似乎我尝试的越多,数据结构就越复杂.

var f_trial = fpairs.groupBy(_._1).mapValues(_.map(_._2))
// the resulting structure was of type org.apache.spark.rdd.RDD[(String, Interable[(String, String, String)]
Run Code Online (Sandbox Code Playgroud)

这是一个记录的println在f_trial之后的样子

("Bob", List((Pizza, Soda,), (Chocolate, Cheese, Soda), (Chocolate, Pizza, Soda)))
Run Code Online (Sandbox Code Playgroud)

括号细分

("Bob", 

List(

(Pizza, Soda, <missing value>),

(Chocolate, Cheese, Soda),

(Chocolate, Pizza, Soda)

) // ends List paren

) // ends first paren
Run Code Online (Sandbox Code Playgroud)

The*_*aul 6

我找时间了.建立:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf

    val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
    val sc = new SparkContext(conf)

    val data = """   
  Bob,123,USA,Pizza,Soda,,Blue
  Bob,456,UK,Chocolate,Cheese,Soda,Green
  Bob,12,USA,Chocolate,Pizza,Soda,Yellow
  Mary,68,USA,Chips,Pasta,Chocolate,Blue
  """.trim

    val records = sc.parallelize(data.split('\n'))
Run Code Online (Sandbox Code Playgroud)

提取食物选择,并为每个选择制作一个元组 ((name, food), 1)

    val r2 = records.flatMap { r =>
      val Array(name, id, country, food1, food2, food3, color) = r.split(',');
      List(((name, food1), 1), ((name, food2), 1), ((name, food3), 1))
    }
Run Code Online (Sandbox Code Playgroud)

每个名称/食物组合总计:

    val r3 = r2.reduceByKey((x, y) => x + y)
Run Code Online (Sandbox Code Playgroud)

重新映射,以便名称(仅)是关键

    val r4 = r3.map { case ((name, food), total) => (name, (food, total)) }
Run Code Online (Sandbox Code Playgroud)

选择每一步计数最多的食物

    val res = r4.reduceByKey((x, y) => if (y._2 > x._2) y else x)
Run Code Online (Sandbox Code Playgroud)

我们已经完成了

    println(res.collect().mkString)
    //(Mary,(Chips,1))(Bob,(Soda,3))
Run Code Online (Sandbox Code Playgroud)

编辑:要收集所有人数最多的食品,我们只需更改最后两行:

从包含总计的项目列表开始:

val r5 = r3.map { case ((name, food), total) => (name, (List(food), total)) }
Run Code Online (Sandbox Code Playgroud)

在相同的情况下,将食品项目列表与该分数连接起来

val res2 = r5.reduceByKey((x, y) => if (y._2 > x._2) y 
                                    else if (y._2 < x._2) x
                                    else (y._1:::x._1, y._2))

//(Mary,(List(Chocolate, Pasta, Chips),1))
//(Bob,(List(Soda),3))
Run Code Online (Sandbox Code Playgroud)

如果你想要前三名,比如说,然后aggregateByKey用来组装每人最喜欢的食物清单,而不是第二名reduceByKey