标签: flink-cep

Apache Flink 延迟处理某些事件

我需要延迟处理某些事件。

例如。我有三个事件(发布在 Kafka 上):

  • A(id:1,重试时间:现在)
  • B(id:2,重试时间:10分钟后)
  • C(id:3,重试时间:现在)

我需要立即处理记录A和C,而记录B需要十分钟后处理。在 Apache Flink 中这是否可行?

到目前为止,无论我研究过什么,“触发器”似乎可能有助于在 Flink 中实现它,但尚未能够正确实现它。

我也查看了 Kafka 文档,但看起来不太可行。

apache-kafka apache-flink flink-streaming flink-cep flink-sql

3
推荐指数
1
解决办法
1957
查看次数

Flink完全一次消息处理

我已经设置了一个Flink 1.2独立集群,其中包含2个JobManagers和3个TaskManagers,我正在使用JMeter通过生成Kafka消息/事件对其进行加载测试,然后对其进行处理.处理作业在TaskManager上运行,通常需要~15K事件/秒.
该作业已设置EXACTLY_ONCE检查点,并将状态和检查点持久保存到Amazon S3.如果我关闭运行作业的TaskManager,它需要几秒钟,然后在另一个TaskManager上恢复作业.该作业主要记录连续整数的事件ID(例如,从0到1200000).
当我检查TaskManager上的输出时,我关闭了最后一次计数,例如500000,然后当我在另一个TaskManager上检查恢复作业的输出时,它以~400000开始.这意味着~100K的重复事件.这个数字取决于测试的速度可以更高或更低.
不确定我是否遗漏了一些东西,但我希望在重新启动不同的TaskManager后,该作业会显示下一个连续的数字(如500001).
有谁知道为什么这发生/额外的设置我必须配置,以获得一次?

apache-flink flink-streaming flink-cep

2
推荐指数
1
解决办法
1649
查看次数

是否可以在 apache flink CEP 中处理多个流?

我的问题是,如果我们有两个原始事件流,即烟雾温度,并且我们想通过将运算符应用于原始流来查明复杂事件(即火灾)是否发生,我们可以在 Flink 中执行此操作吗?

我问这个问题是因为到目前为止我所看到的 Flink CEP 的所有示例都只包含一个输入流。如果我错了,请纠正我。

apache-flink flink-cep

2
推荐指数
1
解决办法
1989
查看次数

Apache Flink:如何根据事件类型将事件接收到不同的Kafka主题?

我想知道是否可以使用Flink Kafka接收器根据事件的类型编写不同主题的事件?假设我们有不同类型的事件:通知,消息和好友请求。我们希望将这些事件流式传输到不同的主题,这些主题分别是:notification-topic,messages-topic,friendsRequest-topic。

我尝试了多种方法来解决此问题,但仍然找不到正确的解决方案。我听说可以使用,ProcessFunction但如何将其与我的问题联系起来?

streaming apache-flink flink-streaming flink-cep

1
推荐指数
1
解决办法
484
查看次数

使用 Flink CEP 测量事件时间延迟

我已经使用 Flink CEP 实现了一个匹配三个事件的模式,例如A->B->C. 在我定义了我的模式后,我生成了一个

PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);

PatternSelectFunction这样的

patternStream.select(new MyPatternSelectFunction()).print();

这就像一个魅力,但我对所有匹配事件的事件时间感兴趣。我知道传统的 Flink 流 API 提供了丰富的功能,允许您注册 Flink 的内部延迟跟踪器,如本问题所述。我还看到 Flink 1.8RichPatternSelectFunction添加了一个新功能。但不幸的是我无法使用 Flink CEP 设置 Flink 1.8。

最后,有没有办法获取所有匹配事件的事件时间?

apache-flink flink-cep

1
推荐指数
1
解决办法
780
查看次数

哪个设置检查点间隔(毫秒)?

每个人。
请帮我。
我编写了 apache flink streraming 作业,它从 apache kafka 读取 json 消息(几秒钟内 500-1000 条消息),在 POJO 中反序列化它们并执行一些操作(filter-keyby-process-sink)。我使用具有 ExactlyOnce 语义的 RocksDB 状态后端。但我不明白我需要设置哪个检查点间隔?
有些论坛的人写的时间大多是 1000 或 5000 毫秒。我尝试将间隔设置为10ms、100ms、500ms、1000ms、5000ms。我没有注意到任何差异。

apache-flink flink-streaming flink-cep

0
推荐指数
1
解决办法
2592
查看次数