Ram*_*ami 6 apache-spark spark-dataframe
我有一个DateFrame如下:
+---+---------------------+---------------------+
|id |initDate |endDate |
+---+---------------------+---------------------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|
|138|2016-10-04 00:00:00.0|null |
+---+---------------------+---------------------+
Run Code Online (Sandbox Code Playgroud)
该行是通过有序id然后initDate按升序排列列.两列initDate和endDate列都具有Timestamp类型.为了便于说明,我只显示了属于一个id值的记录.
我的目标是增加一个新列,显示每个id之间的差值(以天为单位的短期)initDate每行和endDate上一行.
如果没有前一行,则该值将为-1.
输出应如下所示:
+---+---------------------+---------------------+----------+
|id |initDate |endDate |difference|
+---+---------------------+---------------------+----------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|-1 |
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|11 |
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|12 |
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|0 |
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|7 |
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|4 |
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|8 |
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|12 |
|138|2016-10-04 00:00:00.0|null |7 |
+---+---------------------+---------------------+----------+
Run Code Online (Sandbox Code Playgroud)
我想用一个窗口函数来分区记录id,但我不知道如何做下一步.
由于@lostInOverflow的提示,我想出了以下解决方案:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("id").orderBy("initDate")
val previousEnd = lag($"endDate", 1).over(w)
filteredDF.withColumn("prev", previousEnd)
.withColumn("difference", datediff($"initDate", $"prev"))
Run Code Online (Sandbox Code Playgroud)
小智 5
尝试:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("id").orderBy("endDate")
df.withColumn("difference", date_sub($"initDate", lag($"endDate", 1).over(w)))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3519 次 |
| 最近记录: |