Ama*_*ngh 2 apache-flink flink-cep
我的问题是,如果我们有两个原始事件流,即烟雾和温度,并且我们想通过将运算符应用于原始流来查明复杂事件(即火灾)是否发生,我们可以在 Flink 中执行此操作吗?
我问这个问题是因为到目前为止我所看到的 Flink CEP 的所有示例都只包含一个输入流。如果我错了,请纠正我。
简短的回答- 是的,您可以根据来自不同流源的事件类型读取和处理多个流和触发规则。
长答案- 我有一个有点类似的要求,我的答案是基于这样的假设:您正在阅读来自不同卡夫卡主题的不同流。
读取在单个源中流式传输不同事件的不同主题:
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks(new
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull);
Run Code Online (Sandbox Code Playgroud)
序列化器读取数据并将其解析为通用格式 - 例如。
@Data
public class BAMEvent {
private String keyid; //If key based partitioning is needed
private String eventName; // For different types of events
private String eventId; // Any other field you need
private long timestamp; // For event time based processing
public String toString(){
return eventName + " " + timestamp + " " + eventId + " " + correlationID;
}
}
Run Code Online (Sandbox Code Playgroud)
之后,事情就非常简单了,根据事件名称定义规则并比较事件名称来定义规则(您也可以定义复杂的规则,如下所示):
Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
return event.getEventName().equals("event1");
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
if (!secondEvent.getEventName().equals("event2")) {
return false;
}
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getEventId = firstEvent.getEventId()) {
return true;
}
}
return false;
}
})
.within(withinTimeRule);
Run Code Online (Sandbox Code Playgroud)
我希望这能让您想到将一个或多个不同的流集成在一起。
| 归档时间: |
|
| 查看次数: |
1989 次 |
| 最近记录: |