如何从scala中的RDD中获取最早的时间戳日期

Kep*_*ler 3 scala mapreduce apache-spark

我有一个RDD就好((String, String), TimeStamp).我有大量的记录,我想为每个键选择具有最新TimeStamp值的记录.我已经尝试了以下代码,仍然在努力解决这个问题.任何人都可以帮我这样做吗?

我尝试下面的代码是错误的,不能正常工作

val context = sparkSession.read.format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", url)
  .option("dbtable", "student_risk")
  .option("user", "user")
  .option("password", "password")
  .load()
context.cache();

val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))
Run Code Online (Sandbox Code Playgroud)

Tza*_*har 7

直接在DataFrame上进行操作很简单(context这里有一个奇怪的名字):

val result = context
  .groupBy("course_id", "student_id")
  .agg(min("risk_date_time") as "risk_date_time")
Run Code Online (Sandbox Code Playgroud)

然后你可以像以前一样将它转换为RDD(如果需要) - 结果具有相同的模式.

如果您想通过RDD执行此操作,请使用reduceByKey:

studentRDD.reduceByKey((t1, t2) => if (t1.before(t2)) t1 else t2)
Run Code Online (Sandbox Code Playgroud)