Flink WaterMark 和触发器 - 未在事件时间丢弃后期元素?

the*_*dog 3 watermark apache-flink flink-streaming

我对 Flink 在事件时间加水印时如何处理后期元素感到有些困惑。

我的理解是,当 Flink 读取数据流时,水印时间会在看到任何事件时间比当前水印事件时间大的数据时进行。然后,任何覆盖时间严格小于水印的窗口都会被触发驱逐(假设没有延迟允许。

但是,以这个最小的例子为例:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}

object EventTimeExample {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  case class ExampleType(time: Long, value: Long)

  def main(args: Array[String]) {

    // Set up environment
    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // Example S3 path
    val simple = env.fromCollection(Seq(
      ExampleType(1525132800000L, 1),
      ExampleType(1525132800000L, 2) ,
      ExampleType(1525132920000L, 3),
      ExampleType(1525132800000L, 4)
    ))
      .assignAscendingTimestamps(_.time)

    val windows = simple
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
      .apply{
       (window, iter, collector: Collector[(Long, Long, String)]) => {
        collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
      }
    }

    windows.print
    env.execute("TimeStampExample")
  }
}
Run Code Online (Sandbox Code Playgroud)

运行结果如下:

(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
Run Code Online (Sandbox Code Playgroud)

但是,如果我的理解是正确的,则4不应包含在此处的第一个窗口中,因为3到达值记录时应更新水印时间。

现在我认识到这是一个微不足道的例子,但不理解这一点会使理解更复杂的流程变得困难。

Dav*_*son 8

你的理解基本上是正确的,但是这里还有一些事情需要考虑。

首先,您已经使用了assignAscendingTimestamps(),只有在事件流完全有序(按时间戳)时才能使用它,而这里的情况并非如此。运行此应用程序时,您应该会看到此警告:

WARN  org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor  - Timestamp monotony violated: 1525132800000 < 1525132920000
Run Code Online (Sandbox Code Playgroud)

这里起作用的另一个因素是,AscendingTimestampExtractor不会为每个通过的流元素更新当前的水印。这是一个周期性水印生成器的例子,它会Watermarkn毫秒将 a注入到流中,其中 n 由 定义ExecutionConfig.setAutoWatermarkInterval(...),默认为 200 毫秒。这就是事件#4 潜入第一个窗口的方式。

为了获得您期望的结果,您可以实现一个标点水印生成器,配置为为每个事件生成一个水印:

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[ExampleType] {
  override def extractTimestamp(element: ExampleType, previousElementTimestamp: Long): Long = {
    element.time
  }

  override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp)
  }
}
Run Code Online (Sandbox Code Playgroud)

然后你会像这样使用它:

val simple = env.fromCollection(Seq(
  ExampleType(1525132800000L, 1),
  ExampleType(1525132800000L, 2) ,
  ExampleType(1525132920000L, 3),
  ExampleType(1525132800000L, 4)
))
  .assignTimestampsAndWatermarks(new PunctuatedAssigner)
Run Code Online (Sandbox Code Playgroud)

现在您的示例产生以下结果:

(1525132800000,1525132860000,List(1, 2))
(1525132920000,1525132980000,List(3))
Run Code Online (Sandbox Code Playgroud)

事件 #4 已被删除,因为它迟到了。这可以通过放松水印生成器来调整,以适应一定程度的无序。例如,

override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
  new Watermark(extractedTimestamp - 200000)
}
Run Code Online (Sandbox Code Playgroud)

然后产生这些结果:

(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
Run Code Online (Sandbox Code Playgroud)

或者您可以配置窗口以允许延迟事件

val windows = simple
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
  .allowedLateness(Time.seconds(200))
  ...
Run Code Online (Sandbox Code Playgroud)

然后导致第一个窗口触发两次:

(1525132800000,1525132860000,List(1, 2))
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
Run Code Online (Sandbox Code Playgroud)

请注意,由于处理水印会带来一些开销,因此您通常不希望以这种方式使用标点水印(每个事件都有一个水印)。对于大多数应用程序,基于 a 的周期性水印BoundedOutOfOrdernessTimestampExtractor是更好的选择。