Kep*_*ler 3 scala mapreduce apache-spark
我有一个RDD就好((String, String), TimeStamp).我有大量的记录,我想为每个键选择具有最新TimeStamp值的记录.我已经尝试了以下代码,仍然在努力解决这个问题.任何人都可以帮我这样做吗?
我尝试下面的代码是错误的,不能正常工作
val context = sparkSession.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", url)
.option("dbtable", "student_risk")
.option("user", "user")
.option("password", "password")
.load()
context.cache();
val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))
Run Code Online (Sandbox Code Playgroud)
直接在DataFrame上进行操作很简单(context这里有一个奇怪的名字):
val result = context
.groupBy("course_id", "student_id")
.agg(min("risk_date_time") as "risk_date_time")
Run Code Online (Sandbox Code Playgroud)
然后你可以像以前一样将它转换为RDD(如果需要) - 结果具有相同的模式.
如果您想通过RDD执行此操作,请使用reduceByKey:
studentRDD.reduceByKey((t1, t2) => if (t1.before(t2)) t1 else t2)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1341 次 |
| 最近记录: |