Pun*_*cky 1 java kafka-consumer-api spring-kafka
我创建了一个从主题读取的 spring kafka 消费者。有没有办法打印类似于我们打印分区信息的滞后信息?
虽然没有提供源代码,但我假设您通过@KafkaListener注释实现了您的使用者。我有克服你描述利用了同样的问题org.apache.kafka.clients.consumer.Consumer接口,表示在这里。可以在@KafkaListener注解下的consumer方法中声明为参数。该接口提供了metrics()方法,该方法包含存储在records-max-lag属性中的消费者滞后信息。
private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);
@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
Consumer<?, ?> consumer) {
String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
.map(Metric::metricValue).map(Object::toString).distinct()
.collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));
LOGGER.info(lag);
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,我明确选择了记录滞后最大属性。您可以选择任何其他消费者指标,该列表位于Confluent Docs。
上面的代码片段将具有以下输出:
[Kafka current consumer lag] X records
其中 X 是此窗口中任何分区的记录数方面的最大滞后。
我使用的版本2.3.3.RELEASE的的春天卡夫卡库
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2496 次 |
| 最近记录: |