处理时间序列数据时遇到问题.由于电源故障,数据集中缺少某些时间戳.我需要通过添加行来填补这些空白,然后我可以插入缺失的值.
输入数据:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
Run Code Online (Sandbox Code Playgroud)
通缉输出:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 02:30 0
2015-09-11 02:45 0
2015-09-11 03:00 0
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
Run Code Online (Sandbox Code Playgroud)
现在我已在数据集foreach函数中使用while循环修复此问题.问题是我必须首先将数据集收集到驱动程序才能执行while循环.所以这不是Spark的正确方法.
有人能给我一个更好的解决方案吗?
这是我的代码:
MissingMeasurementsDS.collect().foreach(row => {
// empty list for new generated measurements
val output = ListBuffer.empty[Measurement]
// Missing measurements
val missingMeasurements = row.getAs[Int]("missingmeasurements")
val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
//Generate missing timestamps
var …Run Code Online (Sandbox Code Playgroud)