如何使用 Flink 对乱序事件时间流进行排序

Dav*_*son 4 apache-flink flink-streaming

这个问题涵盖了如何使用 Flink SQL 对乱序流进行排序,但我更愿意使用 DataStream API。一种解决方案是使用 ProcessFunction 来做到这一点,它使用 PriorityQueue 来缓冲事件,直到水印表明它们不再乱序为止,但是这与 RocksDB 状态后端的性能很差(问题是对 PriorityQueue 的每次访问都会需要整个 PriorityQueue 的 ser/de)。无论使用哪个状态后端,我如何有效地做到这一点?

Dav*_*son 7

更好的方法(或多或少是 Flink 的 SQL 和 CEP 库在内部完成的)是在 MapState 中缓冲乱序流,如下所示:

如果您要独立地对每个键进行排序,则首先对流进行键控。否则,对于全局排序,通过常量对流进行键控,以便您可以使用 KeyedProcessFunction 来实现排序。

在该open进程函数的方法中,实例化一个 MapState 对象,其中键是时间戳,值是具有相同时间戳的流元素列表。

onElement方法中:

  • 如果一个事件迟到了,要么删除它,要么把它发送到一个侧面输出
  • 否则,将事件附加到与其时间戳对应的地图条目
  • 为该事件的时间戳注册一个事件时间计时器

onTimer被调用时,映射中这个时间戳的条目准备作为排序流的一部分被释放——因为当前水印现在表明所有早期的事件都应该已经被处理。不要忘记在向下游发送事件后清除地图中的条目。