Dav*_*son 4 apache-flink flink-streaming
这个问题涵盖了如何使用 Flink SQL 对乱序流进行排序,但我更愿意使用 DataStream API。一种解决方案是使用 ProcessFunction 来做到这一点,它使用 PriorityQueue 来缓冲事件,直到水印表明它们不再乱序为止,但是这与 RocksDB 状态后端的性能很差(问题是对 PriorityQueue 的每次访问都会需要整个 PriorityQueue 的 ser/de)。无论使用哪个状态后端,我如何有效地做到这一点?
更好的方法(或多或少是 Flink 的 SQL 和 CEP 库在内部完成的)是在 MapState 中缓冲乱序流,如下所示:
如果您要独立地对每个键进行排序,则首先对流进行键控。否则,对于全局排序,通过常量对流进行键控,以便您可以使用 KeyedProcessFunction 来实现排序。
在该open进程函数的方法中,实例化一个 MapState 对象,其中键是时间戳,值是具有相同时间戳的流元素列表。
在onElement方法中:
当onTimer被调用时,映射中这个时间戳的条目准备作为排序流的一部分被释放——因为当前水印现在表明所有早期的事件都应该已经被处理。不要忘记在向下游发送事件后清除地图中的条目。
| 归档时间: |
|
| 查看次数: |
1088 次 |
| 最近记录: |