Flink Streaming:如何实现由start和end元素定义的窗口?

Pri*_*der 7 apache-flink flink-streaming

我有以下格式的数据,

SIP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | INVITE RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 0 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 1 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 2 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58: 58 IST 2016 | 3 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 4 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 5 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二08月16日16:58:58 IST 2016 | 6 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二08月16日16:58:58 IST 2016 | 7 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二08 16:58:58 IST 2016 | 8 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 9 SIP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | BYE

我希望我的窗口在SIP-INVITE遇到消息时启动,并在遇到消息时触发事件SIP-BYE,执行一些聚合.

我该怎么做呢?该SIP-INVITE消息在给定用户的任何时间点出现,并且我可能还有多个SIP-INVITE消息同时出现在多个用户之间.

Fab*_*ske 3

我认为您可以通过用户键入的全局窗口来解决您的用例。全局窗口收集每个键的所有数据,并将触发和清除窗口的责任推给用户定义的Trigger函数。

全局窗口定义如下:

val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
  // one global window per user (handles overlapping SIP-INVITE events).
  .keyBy(_._1)
  // collect all data for each user until the trigger fires and purges the window.
  .window(GlobalWindows.create())
  // you have to implement a custom trigger which reacts on the marker.
  .trigger(new YourCustomTrigger())
  // the window function computes your aggregation.
  .apply(new YourWindowFunction())
Run Code Online (Sandbox Code Playgroud)

我认为执行以下操作的触发器应该有效(假设SIP-INVITE事件始终启动会话)。该Trigger.onElement()方法应该检查该SIP-BYE字段并触发窗口评估并清除窗口,即 return TriggerResult.FIRE_AND_PURGE。这将调用评估函数并删除窗口状态。

请注意,如果您想支持乱序事件,则需要特别注意(在这种情况下,您应该将事件时间计时器设置为关闭元素的时间戳,以确保接收到该时间戳之前的所有数据)。如果有数据应该被丢弃,因为它不是“之间” SIP-INVITESIP-BYE您也需要处理它。

有关详细信息,请参阅全局窗口触发器的文档、JavaDocs[Trigger][3]以及这篇博客文章