Spring Kafka Consumer - 打印 Kafka 滞后信息

Pun*_*cky 1 java kafka-consumer-api spring-kafka

我创建了一个从主题读取的 spring kafka 消费者。有没有办法打印类似于我们打印分区信息的滞后信息?

Fel*_*ias 5

虽然没有提供源代码,但我假设您通过@KafkaListener注释实现了您的使用者。我有克服你描述利用了同样的问题org.apache.kafka.clients.consumer.Consumer接口,表示在这里。可以在@KafkaListener注解下的consumer方法中声明为参数。该接口提供了metrics()方法,该方法包含存储在records-max-lag属性中的消费者滞后信息。

private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);

@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
        Consumer<?, ?> consumer) {



    String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
            .map(Metric::metricValue).map(Object::toString).distinct()
            .collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));


    LOGGER.info(lag);


}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我明确选择了记录滞后最大属性。您可以选择任何其他消费者指标,该列表位于Confluent Docs

上面的代码片段将具有以下输出: [Kafka current consumer lag] X records 其中 X 是此窗口中任何分区的记录数方面的最大滞后。

重要的:

我使用的版本2.3.3.RELEASE的的春天卡夫卡

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)