Apache Flink:创建一个滞后数据流

Kau*_*chy 2 scala apache-flink flink-streaming

我刚刚开始使用Scala的Apache Flink.有人可以告诉我如何从我当前的数据流创建滞后流(滞后k事件或k单位时间)?

基本上,我想在数据流上实现自动回归模型(在流上使用时间滞后版本的线性回归).因此,需要一种类似于以下伪代码的方法.

val ds : DataStream = ...

val laggedDS : DataStream = ds.map(lag _)

def lag(ds : DataStream, k : Time) : DataStream = {

}
Run Code Online (Sandbox Code Playgroud)

如果每个事件的间隔为1秒,并且有2秒的延迟,我希望样本输入和输出如下.

输入:1,2,3,4,5,6,7 ...
输出:NA,NA,1,2,3,4,5 ......

Fab*_*ske 5

鉴于我的要求是正确的,我将把它实现为FlatMapFunction一个FIFO队列.k每当新事件到达时,队列缓冲事件并发出头部.如果您需要容错流应用程序,则必须将队列注册为状态.然后,Flink将负责检查状态(即队列)并在发生故障时恢复它.

FlatMapFunction看起来是这样的:

class Lagger(val k: Int) 
    extends FlatMapFunction[X, X] 
    with Checkpointed[mutable.Queue[X]] 
{

  var fifo: mutable.Queue[X] = new mutable.Queue[X]()

  override def flatMap(value: X, out: Collector[X]): Unit = {
    // add new element to queue
    fifo.enqueue(value)
    if (fifo.size == k + 1) {
      // remove head element and emit
      out.collect(fifo.dequeue())
    }
  }

  // restore state
  override def restoreState(state: mutable.Queue[X]) = { fifo = state }

  // get state to checkpoint
  override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo

}
Run Code Online (Sandbox Code Playgroud)

返回具有时滞的元素更复杂.这将需要用于发射的定时器线程,因为该函数仅在新元素到达时被调用.