Spring Boot:Kafka健康指标

ice*_*l09 8 java spring apache-kafka spring-boot spring-kafka

我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅检查套接字连接)。我知道 Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西,有人有使用它的经验或例子吗?

\n
   public class KafkaHealthIndicator implements HealthIndicator {\n   private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);\n\n   private KafkaTemplate<String, String> kafka;\n\n   public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {\n   this.kafka = kafka;\n   }\n\n  @Override\n  public Health health() {\n  try {\n     kafka.send("kafka-health-indicator", "\xe2\x9d\xa5").get(100, TimeUnit.MILLISECONDS);\n  } catch (InterruptedException | ExecutionException | TimeoutException e) {\n      return Health.down(e).build();\n  }\n  return Health.up().build();\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

Mic*_*ksa 11

为了触发健康指标,请从未来对象之一检索数据,否则UP当 Kafka 关闭时指标甚至会出现!

当 Kafka 未连接时 future.get() 会抛出异常,进而设置此指示器down

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Bean
    public AdminClient kafkaAdminClient() {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    @Bean
    public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
        final DescribeClusterOptions options = new DescribeClusterOptions()
            .timeoutMs(1000);

        return new AbstractHealthIndicator() {
            @Override
            protected void doHealthCheck(Health.Builder builder) throws Exception {
                DescribeClusterResult clusterDescription = kafkaAdminClient.describeCluster(options);

                // In order to trip health indicator DOWN retrieve data from one of
                // future objects otherwise indicator is UP even when Kafka is down!!!
                // When Kafka is not connected future.get() throws an exception which 
                // in turn sets the indicator DOWN.
                clusterDescription.clusterId().get();
                // or clusterDescription.nodes().get().size()
                // or clusterDescription.controller().get();

                builder.up().build();

                // Alternatively directly use data from future in health detail.
                builder.up()
                        .withDetail("clusterId", clusterDescription.clusterId().get())
                        .withDetail("nodeCount", clusterDescription.nodes().get().size())
                        .build();
            }
        };
    }

}
Run Code Online (Sandbox Code Playgroud)

  • 我不确定这个解决方案是否特定于某个版本,但就我而言,它不起作用。当您在 spring-boot 服务启动后停止 Kafka 服务时,它仍然继续返回“UP”。我不确定为什么会发生这种情况(我猜测与管理客户端始终使用 KafkaFuture 对象相关的东西?)所以我使用了 `kafkaAdminClient.listTopics(new ListTopicsOptions().timeoutMs(5000)).names()。 get(5000, MILLISECONDS)` 来强制等待 KafkaFuture 的结果,现在,只要 Kafka 服务关闭,它就会抛出一个 `TimeoutException` 。 (6认同)

cri*_*007 0

使用 AdminClient API 通过描述集群和/或您将与之交互的主题来检查集群的运行状况,并验证这些主题是否具有所需数量的同步副本,例如

Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西

事实并非如此。Spring 的 Kafka 集成可能