SPARK,DataFrame:连续行中Timestamp列的差异

Ram*_*ami 6 apache-spark spark-dataframe

我有一个DateFrame如下:

+---+---------------------+---------------------+
|id |initDate             |endDate              |
+---+---------------------+---------------------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|
|138|2016-10-04 00:00:00.0|null                 |
+---+---------------------+---------------------+
Run Code Online (Sandbox Code Playgroud)

该行是通过有序id然后initDate按升序排列列.两列initDateendDate列都具有Timestamp类型.为了便于说明,我只显示了属于一个id值的记录.

我的目标是增加一个新列,显示每个id之间的差值(以天为单位的短期)initDate每行和endDate上一行.

如果没有前一行,则该值将为-1.

输出应如下所示:

+---+---------------------+---------------------+----------+
|id |initDate             |endDate              |difference|
+---+---------------------+---------------------+----------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|-1        |
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|11        |
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|12        |
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|0         |
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|7         |
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|4         |
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|8         |
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|12        |
|138|2016-10-04 00:00:00.0|null                 |7         |
+---+---------------------+---------------------+----------+
Run Code Online (Sandbox Code Playgroud)

我想用一个窗口函数来分区记录id,但我不知道如何做下一步.

Ram*_*ami 6

由于@lostInOverflow的提示,我想出了以下解决方案:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").orderBy("initDate")
val previousEnd = lag($"endDate", 1).over(w)
filteredDF.withColumn("prev", previousEnd)
          .withColumn("difference", datediff($"initDate", $"prev"))
Run Code Online (Sandbox Code Playgroud)


小智 5

尝试:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").orderBy("endDate")

df.withColumn("difference", date_sub($"initDate", lag($"endDate", 1).over(w)))
Run Code Online (Sandbox Code Playgroud)