小编RHe*_*utz的帖子

填补时间序列Spark的空白

处理时间序列数据时遇到问题.由于电源故障,数据集中缺少某些时间戳.我需要通过添加行来填补这些空白,然后我可以插入缺失的值.

输入数据:

periodstart                usage
---------------------------------
2015-09-11 02:15           23000   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283  
2015-09-11 03:45           23786   
2015-09-11 04:00           25039
Run Code Online (Sandbox Code Playgroud)

通缉输出:

periodstart                usage
---------------------------------
2015-09-11 02:15           23000   
2015-09-11 02:30           0   
2015-09-11 02:45           0   
2015-09-11 03:00           0   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283   
2015-09-11 03:45           23786   
2015-09-11 04:00           25039  
Run Code Online (Sandbox Code Playgroud)

现在我已在数据集foreach函数中使用while循环修复此问题.问题是我必须首先将数据集收集到驱动程序才能执行while循环.所以这不是Spark的正确方法.

有人能给我一个更好的解决方案吗?

这是我的代码:

MissingMeasurementsDS.collect().foreach(row => {
  // empty list for new generated measurements
  val output = ListBuffer.empty[Measurement]
  // Missing measurements
  val missingMeasurements = row.getAs[Int]("missingmeasurements")
  val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
  //Generate missing timestamps
  var …
Run Code Online (Sandbox Code Playgroud)

scala time-series apache-spark apache-spark-sql

6
推荐指数
1
解决办法
4256
查看次数