Pav*_*bek 6 apache-kafka apache-flink
我有一个关于 Flink Kafka Consumer (FlinkKafkaConsumer09) 的问题。我一直在使用这个版本的连接器:flink-connector-kafka-0.9_2.11-1.1.2(连接器版本是0.9,akka版本是2.11,flink版本是1.1.2)
我在 5 分钟的滚动窗口内从 kafka 收集通信数据。据我所知,窗口与系统时间对齐(例如窗口以 12:45、12:50、12:55、13:00 等结束)窗口关闭后,其记录将被处理/聚合并通过 Sink 操作符发送到数据库。
我的程序的简化版本:
env.addSource(new FlinkKafkaConsumer09<>(topicName,jsonMapper, properties))
.keyBy("srcIp", "dstIp", "dstPort")
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
.apply(new CounterSum<>())
.addSink(new DbSink(...));
Run Code Online (Sandbox Code Playgroud)
但是我需要在卡夫卡中提交偏移量。据我所知,FlinkKafkaConsumer09 中的唯一方法是打开检查点。我这样做:
env.enableCheckpointing(300000); // 5 minutes
Run Code Online (Sandbox Code Playgroud)
检查点存储所有操作员的状态。检查点完成后,将偏移量提交到kafka。我的检查点通过 FsStateBackend 存储在任务管理器系统文件结构中(第一个问题 - 较旧的检查点数据未删除,我看到为此报告了一些错误)。第二个问题是检查点何时触发。如果在窗口开始时触发,生成的检查点文件很小,另一方面,在窗口关闭之前触发,生成的状态很大(例如50MB),因为该窗口中已经有很多通信记录。检查点过程通常需要不到1-2秒,但是当窗口关闭后触发检查点并且在处理聚合和DB接收器时,检查点过程需要45秒。
但重点是我根本不需要状态检查点。我所需要的只是在窗口关闭、处理并将结果数据沉入数据库(或在另一个窗口的开头)后将偏移量提交给kafka。如果发生故障转移,flink 将从 kafka 获取最后的偏移量,并再次读取最后 5 分钟间隔的数据。由于上次失败的结果未发送到数据库,因此不会有重复的数据发送到数据库,并且重新读取最后 5 分钟间隔不会产生任何开销。
所以基本上我有两个问题:
有什么方法可以实现关闭检查点并仅提交如上所述的偏移量吗?
如果没有,有什么方法可以将检查点与窗口的开始对齐吗?我阅读了 flink 文档 - 有一个名为保存点(即手动检查点)的功能,但它是从命令行使用的。我需要在窗口启动时从代码中调用保存点 - 状态会很小并且检查点过程会很快。
| 归档时间: |
|
| 查看次数: |
1195 次 |
| 最近记录: |