scala引发了如何获得最新一天的记录

Wic*_*ian 4 scala apache-spark

data=
"""
user date      item1 item2
1    2015-12-01 14  5.6
1    2015-12-01 10  0.6
1    2015-12-02 8   9.4
1    2015-12-02 90  1.3
2    2015-12-01 30  0.3
2    2015-12-01 89  1.2
2    2015-12-30 70  1.9
2    2015-12-31 20  2.5
3    2015-12-01 19  9.3
3    2015-12-01 40  2.3
3    2015-12-02 13  1.4
3    2015-12-02 50  1.0
3    2015-12-02 19  7.8
"""
Run Code Online (Sandbox Code Playgroud)

如果我有上面的数据,我怎样才能得到每个用户的最新一天的记录?我试着使用groupByKey,但不知道.

val user = data.map{
case(user,date,item1,item2)=>((user,date),Array(item1,item2))
}.groupByKey()
Run Code Online (Sandbox Code Playgroud)

然后我不知道如何处理它.谁能给我一些建议?非常感谢:)

更新:

我改变了我的数据,现在用户在最近一天有几条记录,我希望得到所有这些记录.谢谢:)

第二次更新:

我想得到的结果是:

user1 (2015-12-02,Array(8,9.4),Array(90,1.3))
user2 (2015-12-31,Array(20,2.5))
user3 (2015-12-02,Array(13,1.4),Array(50,1.0),Array(19,7,8))
Run Code Online (Sandbox Code Playgroud)

现在我写了一些代码:

val data2=data.trim.split("\\n").map(_split("\\s+")).map{
f=>{(f(0),ArrayBuffer(
                    f(1),
                    f(2).toInt,
                    f(3).toDouble)
    )}
}
val data3 = sc.parallelize(data2)
data3.reduceByKey((x,y)=>
             if(x(0).toString.compareTo(y(0).toString)>=0) x++=y
                  else y).foreach(println)
Run Code Online (Sandbox Code Playgroud)

结果是:

(2,ArrayBuffer(2015-12-31, 20, 2.5))
(1,ArrayBuffer(2015-12-02, 8, 9.4, 2015-12-02, 90, 1.3))
(3,ArrayBuffer(2015-12-02, 13, 1.4, 2015-12-02, 50, 1.0, 2015-12-02, 19, 7.8))
Run Code Online (Sandbox Code Playgroud)

有什么可以改善它吗?:)

Dav*_*ust 5

我认为最好的方法是输入数据映射到的元组的RDD (user, (date, item1, item2))所以RDD会userRdd: RDD[(Int, (Date, Int, Double))]

从这里你可以创建一个reducer,它将采用两个元组并生成另一个相同的格式,即具有更大日期值的元组:

reduceMaxDate(a: (Date, Int, Double), b: (Date, Int, Double)) : (Date, Int, Double) = {
     if(a._1 > b._1) a else b
} 
Run Code Online (Sandbox Code Playgroud)

在这里,您可以通过以下方式找到每个用户的最大值:

userRdd.reduceByKey(reduceMaxDate).
Run Code Online (Sandbox Code Playgroud)

这将产生具有每个用户的最大时间戳的元组.