我想在我的 Flink 工作中限制 Kafka 消费者。
查看 Flink 1.12 的源代码,我发现FlinkConnectorRateLimiter和GuavaFlinkConnectorRateLimiter。但我找不到任何将此速率限制器连接到FlinkKafkaConsumer.
Flink 1.12 中如何实现 Kafka 的速率限制?
FlinkConnectorRateLimiter旧版 Kafka Consumer (flink-connector-kafka-0.10) 可用,但在 Flink 1.12 中已被删除。当前的kafka消费者不提供速率限制。
请参阅此邮件列表线程 - https://lists.apache.org/thread/j7kw131jn0ozmrj763j0hr87b1rj7jop - 进行一些讨论。简而言之,一旦完成了对背压和事件时间偏差下的检查点的正在进行的改进,速率限制就不再具有任何吸引力,因此实际上没有任何兴趣添加对速率限制的支持。
然而,上面的邮件列表线程确实包含一个示例,展示如何通过扩展FlinkKafkaConsumer到 overrideemitRecord和来自己实现 Kafka 的速率限制emitRecordWithTimestamp。
请注意,您应该小心,不要阻塞检查点,这意味着您应该避免在主处理线程中休眠。反序列化模式在另一个线程中运行,因此这是进行速率限制的最佳位置。
| 归档时间: |
|
| 查看次数: |
3400 次 |
| 最近记录: |