如何在 Flink 流处理窗口中收集延迟数据

Soh*_*ani 1 stream-processing windowing apache-flink

假设我有一个数据流,其中包含事件时间数据。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码来做到这一点:

aggregatedTuple
          .keyBy( 0).timeWindow(Time.milliseconds(8))
          .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
Run Code Online (Sandbox Code Playgroud)

Point数据流的关键是处理时间的时间戳映射到处理毫秒的时间戳的后8个约数,例如1531569851297将映射到1531569851296

但数据流可能延迟到达并进入错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎或至少延迟小于窗口时间(8 毫秒),这将是最好的情况。但假设数据流事件时间(也是数据流中的一个字段)已到达,延迟时间为 30 毫秒。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:

  • 如何在数据流想要进入窗口时过滤数据流并检查数据是否在窗口的正确时间戳创建?
  • 如何将如此晚的数据收集到变量中以对它们进行一些处理?

Dav*_*son 6

Flink 有两个不同的、相关的抽象,处理带有事件时间时间戳的流上计算窗口分析的不同方面:水印允许的延迟

首先是水印,每当处理事件时间数据时(无论您是否使用 Windows),水印都会发挥作用。水印向 Flink 提供有关事件时间进度的信息,并为应用程序编写者提供一种处理无序数据的方法。水印随数据流流动,每个水印标记流中的一个位置并带有时间戳。水印充当一个断言,即在流中的该点,流现在(可能)已完成到该时间戳,或者换句话说,水印之后的事件不太可能来自水印指示的时间之前。水印。最常见的水印策略是使用BoundedOutOfOrdernessTimestampExtractor,它假设事件在某个固定的、有界的延迟内到达。

现在提供了延迟的定义 - 遵循水印且时间戳小于水印时间戳的事件被视为延迟

window API 提供了允许迟到的概念,默认设置为零。如果允许的延迟大于零,则事件时间窗口的默认触发器将接受延迟事件到其适当的窗口中,直到允许延迟的限制。窗口操作将在通常时间触发一次,然后针对每个延迟事件再次触发,直到允许的延迟间隔结束。此后,迟到的事件将被丢弃(或收集到侧输出(如果已配置))。

How can I filter data stream as it wants to enter the window and check 
if the data created at the right timestamp for the window?
Run Code Online (Sandbox Code Playgroud)

Flink 的窗口分配器负责将事件分配给适当的窗口——正确的事情会自动发生。将根据需要创建新的窗口实例。

How can I gather such late data in a variable to do some processing on them?
Run Code Online (Sandbox Code Playgroud)

您可以在水印方面足够慷慨,以避免出现任何迟到的数据,和/或将允许的迟到时间配置得足够长以适应迟到的事件。但请注意,Flink 将被迫保持所有仍在接受延迟事件的窗口打开,这将延迟旧窗口的垃圾收集,并可能消耗大量内存。

请注意,本讨论假设您想要使用时间窗口,例如您正在使用的 8 毫秒长窗口。Flink 还支持计数窗口(例如将事件分组为 100 个批次)、会话窗口和自定义窗口逻辑。例如,如果您使用计数窗口,则水印和延迟不会发挥任何作用。

如果您想要分析每个键的结果,请在应用窗口之前使用 keyBy 按键(例如,按 userId)对流进行分区。例如

stream
  .keyBy(e -> e.userId)
  .timeWindow(Time.seconds(10))
  .reduce(...)
Run Code Online (Sandbox Code Playgroud)

将为每个 userId 生成单独的结果。

更新:请注意,在 Flink 的最新版本中,Windows 现在可以将延迟事件收集到侧面输出。

一些相关文档:

活动时间和
允许迟到的水印