Pri*_*der 7 apache-flink flink-streaming
我有以下格式的数据,
SIP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | INVITE RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 0 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 1 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 2 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58: 58 IST 2016 | 3 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 4 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 5 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二08月16日16:58:58 IST 2016 | 6 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二08月16日16:58:58 IST 2016 | 7 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二08 16:58:58 IST 2016 | 8 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 9 SIP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | BYE
我希望我的窗口在SIP-INVITE遇到消息时启动,并在遇到消息时触发事件SIP-BYE,执行一些聚合.
我该怎么做呢?该SIP-INVITE消息在给定用户的任何时间点出现,并且我可能还有多个SIP-INVITE消息同时出现在多个用户之间.
我认为您可以通过用户键入的全局窗口来解决您的用例。全局窗口收集每个键的所有数据,并将触发和清除窗口的责任推给用户定义的Trigger函数。
全局窗口定义如下:
val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
// one global window per user (handles overlapping SIP-INVITE events).
.keyBy(_._1)
// collect all data for each user until the trigger fires and purges the window.
.window(GlobalWindows.create())
// you have to implement a custom trigger which reacts on the marker.
.trigger(new YourCustomTrigger())
// the window function computes your aggregation.
.apply(new YourWindowFunction())
Run Code Online (Sandbox Code Playgroud)
我认为执行以下操作的触发器应该有效(假设SIP-INVITE事件始终启动会话)。该Trigger.onElement()方法应该检查该SIP-BYE字段并触发窗口评估并清除窗口,即 return TriggerResult.FIRE_AND_PURGE。这将调用评估函数并删除窗口状态。
请注意,如果您想支持乱序事件,则需要特别注意(在这种情况下,您应该将事件时间计时器设置为关闭元素的时间戳,以确保接收到该时间戳之前的所有数据)。如果有数据应该被丢弃,因为它不是“之间” SIP-INVITE,SIP-BYE您也需要处理它。
有关详细信息,请参阅全局窗口和触发器的文档、JavaDocs[Trigger][3]以及这篇博客文章。
| 归档时间: |
|
| 查看次数: |
251 次 |
| 最近记录: |