如何在Spark-scala中实现LEAD和LAG

use*_*589 4 scala apache-spark

我有最后的记录(后加入和过滤)的火花dataframe.I需要比较连续行的(通过键分区)列值,并根据病情需要改变e_date列的值,例如:

    sample table
    key1 key 2   col1   col2   s_date      e_date
     a     1      cv1     cv2   2014         2099 
     a     1      cv3     cv2   2016         2099 
     b     2      cv5     cv6   2016         2099
     b     2      cv5     cv6   2016         2099

   final table should look like 
    key1 key 2   col1   col2   s_date      e_date
     a     1      cv1     cv2   2014         2015  (next records s_date-1) 
     a     1      cv3     cv2   2016         2099 
     b     2      cv5     cv6   2016         2099
Run Code Online (Sandbox Code Playgroud)
  1. 上面的表有复合键,所以key1和key2是键

  2. 通过键比较col1和col2值

  3. 如果任何列具有新值结束旧记录,其中新记录的s_date为-1(最终表中的第1,2行)

  4. 如果没有变化则忽略新记录(最终表格中的第3行)

scala-spark中的任何指针

小智 7

领先和滞后已经实施:

import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window

lag('s_date, 1).over(Window.partitionBy('key1, 'key2).orderBy('s_date))
Run Code Online (Sandbox Code Playgroud)

有关详细信息,请查看Spark SQL中的窗口函数简介.