ish*_*han 9 java apache-kafka spring-boot spring-kafka
我有一个 spring boot (2.1.6) 应用程序,它既消费又向(组织范围内的)公共 kafka 实例生成消息。我正在尝试使用 spring 执行器在我的应用程序中为这个 kafka 代理实施健康检查,但我面临着一系列与性能和日志记录相关的问题。spring boot 2.0 中内置了一个健康指示器,但由于一些明显的问题,他们将其删除。
这是我实现的健康检查类:
@Component
public class KafkaHealthCheck implements HealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);
private KafkaAdmin kafkaAdmin;
private Map<String, Object> kafkaConfig;
@Value("${application.topic}")
private String topicName;
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}
@PostConstruct
public void setUpAdminClient() {
kafkaConfig = new HashMap<>();
kafkaConfig.putAll(kafkaAdmin.getConfig());
kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
}
@Override
public Health health() {
Long start = System.currentTimeMillis();
try (AdminClient adminClient = AdminClient.create(kafkaConfig)) {
DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(2000);
adminClient.describeCluster(describeClusterOptions);
adminClient.describeConsumerGroups(List.of("topic")).all()
.get(2, TimeUnit.SECONDS);
Map<String, TopicDescription> topicDescriptionMap = adminClient
.describeTopics(List.of(topicName)).all().get(2, TimeUnit.SECONDS);
List<TopicPartitionInfo> partitions = topicDescriptionMap.get(topicName)
.partitions();
if (partitions == null || partitions.isEmpty()) {
logger.warn(String
.format("Kafka healthcheck failed - No partition found for topic: %s", topicName));
return Health.down()
.withDetail("Kafka healthcheck failed", "No partition found for topic: " + topicName)
.build();
} else {
if (partitions.stream().anyMatch(p -> p.leader() == null)) {
logger.warn(
String.format("Kafka healthcheck failed - No partition leader found for topic: %s",
topicName));
return Health.down().withDetail("Kafka healthcheck failed",
"No partition leader found for topic: " + topicName).build();
}
}
} catch (Exception e) {
logger.warn("Kafka healthcheck failed", e);
return Health.down()
.withDetail("Kafka healthcheck failed", "Exception occurred during healthcheck").build();
}
System.out.println(System.currentTimeMillis() - start);
return Health.up().build();
}
}
Run Code Online (Sandbox Code Playgroud)
现在这些是我遇到的问题或我在这个实现中面临的问题:
1 - 除了“bootstrap.servers”之外,KafkaAdmin 注入了我拥有的所有配置(我使用的是 SSL)。我发现它org.springframework.boot.autoconfigure.kafka.KafkaProperties
有localhost:9092
作为默认值,它在某种程度上不会被应用程序配置覆盖,而它对消费者和生产者来说工作正常。我不知道为什么会这样,因此我必须在这里手动设置。
2 - 我添加了超时DescribeClusterOptions
,describeConsumerGroups
但这些超时似乎被完全忽略了。如果我手动关闭代理,则运行状况检查大约需要几分钟才能报告错误。
3 - 由于 bootstrap.servers 错误,当我实际部署应用程序时,它几乎杀死了我的日志服务器,因为org.apache.kafka.clients.NetworkClient
说Connection to node -1 could not be established. Broker may not be available.
. 我怎样才能阻止它再次发生?即使在操作过程中经纪人宕机的情况下。
4 - 当我创建 AdminClient 时,即使是成功的健康检查也会生成很多日志行。它记录了它读取的所有配置和一堆其他语句。有没有机会最小化它?
5 - 总的来说,这很慢。我试图计算仅运行此运行状况检查所需的时间,平均约为 1.5 秒。有机会优化吗?
归档时间: |
|
查看次数: |
3314 次 |
最近记录: |