Wic*_*ian 4 scala apache-spark
data=
"""
user date item1 item2
1 2015-12-01 14 5.6
1 2015-12-01 10 0.6
1 2015-12-02 8 9.4
1 2015-12-02 90 1.3
2 2015-12-01 30 0.3
2 2015-12-01 89 1.2
2 2015-12-30 70 1.9
2 2015-12-31 20 2.5
3 2015-12-01 19 9.3
3 2015-12-01 40 2.3
3 2015-12-02 13 1.4
3 2015-12-02 50 1.0
3 2015-12-02 19 7.8
"""
Run Code Online (Sandbox Code Playgroud)
如果我有上面的数据,我怎样才能得到每个用户的最新一天的记录?我试着使用groupByKey,但不知道.
val user = data.map{
case(user,date,item1,item2)=>((user,date),Array(item1,item2))
}.groupByKey()
Run Code Online (Sandbox Code Playgroud)
然后我不知道如何处理它.谁能给我一些建议?非常感谢:)
我改变了我的数据,现在用户在最近一天有几条记录,我希望得到所有这些记录.谢谢:)
我想得到的结果是:
user1 (2015-12-02,Array(8,9.4),Array(90,1.3))
user2 (2015-12-31,Array(20,2.5))
user3 (2015-12-02,Array(13,1.4),Array(50,1.0),Array(19,7,8))
Run Code Online (Sandbox Code Playgroud)
现在我写了一些代码:
val data2=data.trim.split("\\n").map(_split("\\s+")).map{
f=>{(f(0),ArrayBuffer(
f(1),
f(2).toInt,
f(3).toDouble)
)}
}
val data3 = sc.parallelize(data2)
data3.reduceByKey((x,y)=>
if(x(0).toString.compareTo(y(0).toString)>=0) x++=y
else y).foreach(println)
Run Code Online (Sandbox Code Playgroud)
结果是:
(2,ArrayBuffer(2015-12-31, 20, 2.5))
(1,ArrayBuffer(2015-12-02, 8, 9.4, 2015-12-02, 90, 1.3))
(3,ArrayBuffer(2015-12-02, 13, 1.4, 2015-12-02, 50, 1.0, 2015-12-02, 19, 7.8))
Run Code Online (Sandbox Code Playgroud)
有什么可以改善它吗?:)
我认为最好的方法是输入数据映射到的元组的RDD (user, (date, item1, item2))所以RDD会userRdd: RDD[(Int, (Date, Int, Double))]
从这里你可以创建一个reducer,它将采用两个元组并生成另一个相同的格式,即具有更大日期值的元组:
reduceMaxDate(a: (Date, Int, Double), b: (Date, Int, Double)) : (Date, Int, Double) = {
if(a._1 > b._1) a else b
}
Run Code Online (Sandbox Code Playgroud)
在这里,您可以通过以下方式找到每个用户的最大值:
userRdd.reduceByKey(reduceMaxDate).
Run Code Online (Sandbox Code Playgroud)
这将产生具有每个用户的最大时间戳的元组.
| 归档时间: |
|
| 查看次数: |
3502 次 |
| 最近记录: |