Flink中如何对WindowedStream进行自定义操作?

Piy*_*ava 2 apache-flink flink-streaming

我想在 Flink 中的 WindowedStream 上执行一些操作,比如平均操作。但可用的预定义操作非常有限,例如求和、最小值、最大值等。

val windowedStream = valueStream
                          .keyBy(0)
                          .timeWindow(Time.minutes(5))
                          .sum(2) //Change this to average?
Run Code Online (Sandbox Code Playgroud)

假设我想求平均值,我该怎么做呢?

Fab*_*ske 5

Flink 没有内置函数来计算WindowStream. 您必须WindowFunction为此实施自定义。

最有效的方法是实现一个ReduceFunction计算您想要平均的值的计数和总和,以及一个后续的计算平均值WindowFunction的结果。ReduceFunction使用 aReduceFunction更高效,因为 Flink 将其直接应用于传入值。因此,它会动态聚合值,而不是在窗口中收集它们。这显着减少了窗口的内存占用。

由于 a 的输出ReduceFunction与其输入具有相同的类型,因此您需要在应用ReduceFunction.

像下面这样的东西应该可以解决问题:

val valueStream: DataStream[(String, Double)] = ???

val r: DataStream[(String, Double)] = valueStream
  // append a 1L for counting
  .map(x => (x._1, x._2, 1l))
  // key and window stream
  .keyBy(0).timeWindow(Time.minutes(5))
  .apply(
    // ReduceFunction (compute sum and count)
    (x: (String, Double, Long), y: (String, Double, Long)) => 
      (x._1, x._2 + y._2, x._3 + y._3),
    // WindowFunction
    (key, window: TimeWindow, input: Iterable[(String, Double, Long)], out: Collector[(String, Double)]) => {
      // get first (and only) value
      val x: (String, Double, Long) = input.toIterator.next
      // compute average as sum / count
      out.collect(x._1, x._2 / x._3)
    }
  )
Run Code Online (Sandbox Code Playgroud)