在 Kafka Java 消费者客户端上,有没有办法监控健康状态而不是简单的无数据?

mju*_*rez 8 java apache-kafka kafka-consumer-api

我有一个典型的 kafka 消费者/生产者应用程序,它一直在轮询数据。有时,可能有几个小时没有数据,但有时每秒可能有数千条消息。正因为如此,应用程序被构建为始终轮询,持续时间为 500 毫秒。

但是,我注意到有时,如果 kafka 集群出现故障,消费者客户端一旦启动,就不会抛出异常,它只会在 500 毫秒时超时,并继续返回 empty ConsumerRecords<K,V>。所以,就应用而言,没有数据可以消费,而实际上整个 Kafka 集群可能无法访问,但应用本身并不知道。

我检查了文档,除了可能每次都关闭连接并订阅主题之外,我找不到验证消费者健康的方法,但我真的不想在长时间运行的应用程序上这样做。

在轮询时验证消费者是否活跃和健康的最佳方法是什么,理想情况下来自同一个线程/客户端对象,以便应用程序可以区分无数据和无法访问的 kafka 集群情况?

Aja*_*ary 1

我确信这不是实现您所寻求的最佳方式。

但我在应用程序中实现的一种简单方法是在应用程序中维护一个静态计数器emptyRecordSetReceived来指示. 每当我收到轮询操作设置的空记录时,我都会递增该计数器。

在应用程序的 Metric 注册表的帮助下,该计数器定期(例如每分钟)发送到 Graphite。

现在假设您知道该应用程序无法使用该消息的最长时间范围。例如,说6小时。鉴于您每 500 毫秒轮询一次,您知道如果我们在 6 小时内没有收到消息,计数器将增加

2 poll in 1 second * 60 seconds * 60 minutes * 6 hours = 43200.
Run Code Online (Sandbox Code Playgroud)

我们根据报告给 Graphite 的计数器值进行了警报检查。这个指标曾经让我知道这是应用程序的真正问题还是代理或生产者方面的其他问题。

这只是我在某种程度上解决这个用例的天真的方法。我很想知道在不维护这些计数器的情况下它是如何实际完成的。