Sow*_*a V 5 apache-flink flink-cep
我有一个场景,如果第二个事件在x秒内没有跟随第一个事件,我必须改变状态.例如,用户未在100分钟内注销,请认为他处于无效状态.如何使用当前的模式操作来设计它?
由于这已经实施,我想为那些来这里寻找答案的人回答这个问题。
从 Flink 1.0.0 开始,这可以通过处理超时模式来完成,例如,如果您的 CEP 模式是这样的:
部分来自Flink 网站的示例(1.2 和 1.3 之间有一些重大变化,请相应地调整您的代码,此答案侧重于 1.3)
模式描述: - 在 10 秒内获取“错误”类型的第一个事件,然后是“严重”类型的第二个事件事件
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream<BAMEvent> patternStream = CEP.pattern(inputStream, pattern)
DataStream<Either<String, String>> result = patternStream.select(new PatternTimeoutFunction<Event, String>() {
@Override
public String timeout(Map<String, List<Event>> map, long l) throws Exception {
return map.toString() +" @ "+ l;
}
}, new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> map) throws Exception {
return map.toString();
}
});
Run Code Online (Sandbox Code Playgroud)
对于这种情况,如果用户在 100 分钟后仍未注销,则由于相应的事件不会到达,这将导致模式超时,部分事件(启动事件)将在 PatternTimeoutFunction 中捕获。
归档时间: |
|
查看次数: |
561 次 |
最近记录: |