Rob*_*ann 1 apache-flink flink-cep
我已经使用 Flink CEP 实现了一个匹配三个事件的模式,例如A->B->C. 在我定义了我的模式后,我生成了一个
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
与PatternSelectFunction这样的
patternStream.select(new MyPatternSelectFunction()).print();
这就像一个魅力,但我对所有匹配事件的事件时间感兴趣。我知道传统的 Flink 流 API 提供了丰富的功能,允许您注册 Flink 的内部延迟跟踪器,如本问题所述。我还看到 Flink 1.8RichPatternSelectFunction添加了一个新功能。但不幸的是我无法使用 Flink CEP 设置 Flink 1.8。
最后,有没有办法获取所有匹配事件的事件时间?
您不需要丰富的功能来使用 Flink 的延迟跟踪。您只需latencyTrackingInterval在 Flink 配置或 ExecutionConfig 中设置为正数即可启用它,例如,
env.getConfig().setLatencyTrackingInterval(1000);
Run Code Online (Sandbox Code Playgroud)
然后您可以在指标解决方案中或通过 REST api 观察结果(Flink Web UI 中不会报告延迟指标)。
更新:
延迟统计信息是作业指标,位于由
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics
Run Code Online (Sandbox Code Playgroud)
可以从以下位置获取延迟指标值
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>
Run Code Online (Sandbox Code Playgroud)
这些指标的名称如下
latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>
Run Code Online (Sandbox Code Playgroud)
其中 ID 标识作业图中的源节点和操作节点,在这两个节点之间测量延迟。
例如,我可以通过以下请求确定我现在正在运行的作业中的源和接收器之一之间的 95% 延迟:
http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95
Run Code Online (Sandbox Code Playgroud)
或者,您可以使用 ProcessFunction 在事件进入作业的 CEP 部分之前向事件添加处理时间时间戳,然后使用另一个 ProcessFunction 来测量经过的时间。