小编Ard*_*iel的帖子

如何使用java获得kafka滞后

我目前开发了一个代码来显示主题、分区和日志偏移量。但我目前被困在如何获得分区的滞后上。我知道有一个 kafka offset 命令可以完成这个功能,但我需要的是一个 java 代码。

public static void main(String[] args) throws Exception {
    System.out.println("START CONSUMER");final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    int i = 0;
    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
        partitions.add(partitiontemp);
    }
    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        System.out.printf("Topic: %s …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
3388
查看次数

标签 统计

apache-kafka ×1

java ×1

kafka-consumer-api ×1