处理时间窗口不适用于 Apache Flink 中的有限数据源

Iva*_*nko 2 apache-flink flink-streaming

我正在尝试将一个非常简单的窗口函数应用于 Apache Flink 中的有限数据流(本地,无集群)。这是示例:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))

  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })

  .print()

env.execute()
Run Code Online (Sandbox Code Playgroud)

在这里,我尝试将在一秒钟内到达窗口的所有元素分组,然后只打印这些组。

我假设所有元素都将在不到一秒的时间内生成并进入一个窗口,因此print(). 但是,当我运行它时,根本没有打印任何内容

如果我删除所有窗口的东西,比如

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .print()
Run Code Online (Sandbox Code Playgroud)

我看到运行后打印的元素。我也用文件源试过这个,没有区别。

我机器上的默认并行度是 6。如果我试验并行度和延迟的级别,像这样

val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .map { x => Thread.sleep(1500); x }
Run Code Online (Sandbox Code Playgroud)

我能够将一些——不是全部——元素分组,然后打印出来。

我的第一个假设是源的完成速度远快于 1 秒,并且任务在窗口的计时器触发之前关闭。调试显示已到达定时器设置行ProcessingTimeTrigger。难道所有启动的计时器不应该在任务关闭之前完成(至少这是我从代码中得到的印象)?

你能帮我理解这一点并使之更具确定性吗?

2018 年 9 月 23 日更新 #1:

我还试验了事件时间窗口而不是处理时间窗口。如果我这样做:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
    override def extractAscendingTimestamp(element: String): Long = {
      element.charAt(0).toInt
    }
  })

  .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
  .trigger(EventTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.toString())
    }
  })

  .print()

env.execute()
Run Code Online (Sandbox Code Playgroud)

然后再次没有打印任何内容。调试器显示onElement为每个元素调用触发器,但onEventTime从未调用过。

另外,如果我修改时间戳提取器以进行更大的步骤:

element.charAt(0).toInt * 1000
Run Code Online (Sandbox Code Playgroud)

除了最后一个之外,所有元素都被打印出来(每组一个元素,这是预期的)。

2018 年 9 月 23 日更新 #2:

此评论中回答了更新 #1 。

Dav*_*son 7

当有限源到达末尾时,如果您使用事件时间,则将注入时间戳为 Long.MAX_VALUE 的水印,这将导致所有事件时间计时器触发。但是,随着处理时间的增加,Flink 将等待所有当前正在触发的计时器完成其操作,然后退出。

正如您所怀疑的,您没有看到任何输出,因为源代码很快就完成了。

事件时间处理的确定性行为很简单;随着处理时间,它并不是真正可以实现的。

但这里有一个或多或少有效的黑客:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val s = env.fromCollection(List("a", "b", "c", "d", "e"))
val t = env.addSource((context: SourceContext[String]) => {
  while(true) {
    Thread.sleep(100)
    context.collect("dummy")
  }
})

s.union(t)
  .filter(_ != "dummy")
  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })
  .print()

env.execute()
Run Code Online (Sandbox Code Playgroud)