ice*_*l09 8 java spring apache-kafka spring-boot spring-kafka
我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅检查套接字连接)。我知道 Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西,有人有使用它的经验或例子吗?
\n public class KafkaHealthIndicator implements HealthIndicator {\n private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);\n\n private KafkaTemplate<String, String> kafka;\n\n public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {\n this.kafka = kafka;\n }\n\n @Override\n public Health health() {\n try {\n kafka.send("kafka-health-indicator", "\xe2\x9d\xa5").get(100, TimeUnit.MILLISECONDS);\n } catch (InterruptedException | ExecutionException | TimeoutException e) {\n return Health.down(e).build();\n }\n return Health.up().build();\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n
Mic*_*ksa 11
为了触发健康指标,请从未来对象之一检索数据,否则UP当 Kafka 关闭时指标甚至会出现!
当 Kafka 未连接时 future.get() 会抛出异常,进而设置此指示器down。
@Configuration
public class KafkaConfig {
@Autowired
private KafkaAdmin kafkaAdmin;
@Bean
public AdminClient kafkaAdminClient() {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
@Bean
public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
final DescribeClusterOptions options = new DescribeClusterOptions()
.timeoutMs(1000);
return new AbstractHealthIndicator() {
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
DescribeClusterResult clusterDescription = kafkaAdminClient.describeCluster(options);
// In order to trip health indicator DOWN retrieve data from one of
// future objects otherwise indicator is UP even when Kafka is down!!!
// When Kafka is not connected future.get() throws an exception which
// in turn sets the indicator DOWN.
clusterDescription.clusterId().get();
// or clusterDescription.nodes().get().size()
// or clusterDescription.controller().get();
builder.up().build();
// Alternatively directly use data from future in health detail.
builder.up()
.withDetail("clusterId", clusterDescription.clusterId().get())
.withDetail("nodeCount", clusterDescription.nodes().get().size())
.build();
}
};
}
}
Run Code Online (Sandbox Code Playgroud)
使用 AdminClient API 通过描述集群和/或您将与之交互的主题来检查集群的运行状况,并验证这些主题是否具有所需数量的同步副本,例如
Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西
事实并非如此。Spring 的 Kafka 集成可能
| 归档时间: |
|
| 查看次数: |
14588 次 |
| 最近记录: |