如果自上次事件后没有更改传感器读数,如何计算窗口上的聚合?

Chr*_*now 9 apache-spark spark-streaming spark-structured-streaming

如果传感器值自上次事件发生变化后仅发送新事件,我如何计算窗口上的聚合?传感器读数在固定时间进行,例如每5秒进行一次,但仅在读数自上次读数后发生变化时才转发.

所以,如果我想为每个设备创建一个signal_stength的平均值:

eventsDF = ... 
avgSignalDF = eventsDF.groupBy("deviceId").avg("signal_strength")
Run Code Online (Sandbox Code Playgroud)

例如,设备在一分钟窗口内发送的事件:

event_time  device_id  signal_strength
12:00:00    1          5
12:00:05    1          4
12:00:30    1          5
12:00:45    1          6
12:00:55    1          5
Run Code Online (Sandbox Code Playgroud)

填充了实际未发送的事件的相同数据集:

event_time  device_id  signal_strength
12:00:00    1          5
12:00:05    1          4
12:00:10    1          4
12:00:15    1          4
12:00:20    1          4
12:00:25    1          4
12:00:30    1          5
12:00:35    1          5
12:00:40    1          5
12:00:45    1          6
12:00:50    1          6
12:00:55    1          5
Run Code Online (Sandbox Code Playgroud)

该signal_strength sum57avg57/12

如何通过火花结构化流媒体推断这些缺失数据,并根据推断值计算平均值?

注意:我使用average作为聚合的示例,但解决方案需要适用于任何聚合函数.

vde*_*dep 2

编辑:

我修改了逻辑以仅根据过滤后的值计算平均值dataframe,从而解决了差距。

//input structure
case class StreamInput(event_time: Long, device_id: Int, signal_strength: Int)
//columns for which we want to maintain state
case class StreamState(prevSum: Int, prevRowCount: Int, prevTime: Long, prevSignalStrength: Int, currentTime: Long, totalRow: Int, totalSum: Int, avg: Double)
//final result structure
case class StreamResult(event_time: Long, device_id: Int, signal_strength: Int, avg: Double)

val filteredDF = ???  //get input(filtered rows only)

val interval = 5  // event_time interval

// using .mapGroupsWithState to maintain state for runningSum & total row count till now

// you need to set the timeout threshold to indicate how long you wish to maintain the state
val avgDF = filteredDF.groupByKey(_.device_id)
  .mapGroupsWithState[StreamState, StreamResult](GroupStateTimeout.NoTimeout()) {

  case (id: Int, eventIter: Iterator[StreamInput], state: GroupState[StreamState]) => {
    val events = eventIter.toSeq

    val updatedSession = if (state.exists) {
      //if state exists update the state with the new values
      val existingState = state.get

      val prevTime = existingState.currentTime
      val currentTime = events.map(x => x.event_time).last
      val currentRowCount = (currentTime - prevTime)/interval
      val rowCount = existingState.rowCount + currentRowCount.toInt
      val currentSignalStength = events.map(x => x.signal_strength).last

      val total_signal_strength = currentSignalStength + 
        (existingState.prevSignalStrength * (currentRowCount -1)) + 
        existingState.total_signal_strength

      StreamState(
        existingState.total_signal_strength,
        existingState.rowCount,
        prevTime,
        currentSignalStength,
        currentTime,
        rowCount,
        total_signal_strength.toInt,
        total_signal_strength/rowCount.toDouble
      )

    } else {
      // if there are no earlier state
      val runningSum = events.map(x => x.signal_strength).sum
      val size = events.size.toDouble
      val currentTime = events.map(x => x.event_time).last
      StreamState(0, 1, 0, runningSum, currentTime, 1, runningSum, runningSum/size)
    }

    //save the updated state
    state.update(updatedSession)
    StreamResult(
      events.map(x => x.event_time).last,
      id,
      events.map(x => x.signal_strength).last,
      updatedSession.avg
    )
  }
}

val result = avgDF
  .writeStream
  .outputMode(OutputMode.Update())
  .format("console")
  .start
Run Code Online (Sandbox Code Playgroud)

这个想法是计算两个新的列:

  1. TotalRowCount:如果尚未过滤,则应该存在的行数的运行总数。
  2. Total_signal_strength:signal_strength到目前为止的运行总计。(这也包括丢失的行总数)。

其计算公式为:

total_signal_strength = 
  current row's signal_strength  +  
  (total_signal_strength of previous row * (rowCount -1)) + 
  //rowCount is the count of missed rows computed by comparing previous and current event_time.
  previous total_signal_strength
Run Code Online (Sandbox Code Playgroud)

中间状态的格式:

+----------+---------+---------------+---------------------+--------+
|event_time|device_id|signal_strength|total_signal_strength|rowCount|
+----------+---------+---------------+---------------------+--------+
|         0|        1|              5|                    5|       1|
|         5|        1|              4|                    9|       2|
|        30|        1|              5|                   30|       7|
|        45|        1|              6|                   46|      10|
|        55|        1|              5|                   57|      12|
+----------+---------+---------------+---------------------+--------+
Run Code Online (Sandbox Code Playgroud)

最终输出:

+----------+---------+---------------+-----------------+
|event_time|device_id|signal_strength|              avg|
+----------+---------+---------------+-----------------+
|         0|        1|              5|              5.0|
|         5|        1|              4|              4.5|
|        30|        1|              5|4.285714285714286|
|        45|        1|              6|              4.6|
|        55|        1|              5|             4.75|
+----------+---------+---------------+-----------------+
Run Code Online (Sandbox Code Playgroud)