如何统计Apache Flink在给定时间窗口内处理的记录数

Ani*_*Das 3 apache-flink flink-streaming

在flink中定义一个时间窗口后如下:

val lines = socket.timeWindowAll(Time.seconds(5))
Run Code Online (Sandbox Code Playgroud)

如何计算该特定 5 秒窗口内的记录数?

Fab*_*ske 6

执行计数聚合的最有效方法是ReduceFunction. 然而,reduce有一个限制,即输入和输出类型必须相同。Int因此,在应用窗口之前,您必须将输入转换为:

val socket: DataStream[(String)] = ???

val cnts: DataStream[Int] = socket
  .map(_ => 1)                    // convert to 1
  .timeWindowAll(Time.seconds(5)) // group into 5 second windows
  .reduce( (x, y) => x + y)       // sum 1s to count
Run Code Online (Sandbox Code Playgroud)