Apache Kafka Consumer 停止消费消息

Val*_*riy 6 java apache-kafka kafka-consumer-api

我有一个卡夫卡消费者的问题。我使用新的 Kafka 和新的 Consumer Java API。从快速入门开始,它是最简单的 Kafka 和 Zookeeper 。

我启动应用程序,在我的消费者从主题消费消息几次后,它停止接收。

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class MyKC{

    public MyKC(){
        Properties config = new Properties();
        config.put("zookeeper.connect", "localhost:2181");
        config.put("group.id", "default");
        config.put("bootstrap.servers", "localhost:9092");
        config.put("enable.auto.commit", "true");
        config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);

        TopicPartition tp = new TopicPartition("connect-test", 0);
        List<TopicPartition> ltp = Arrays.asList(tp);
        consumer.assign(ltp);
        consumer.seekToEnd(ltp);
        ConsumerRecords<String, String> records;

        while(true){
            records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records)
                System.out.println(record.value());
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当我附加到我的代码计数消息数时:

while(true){
    records = consumer.poll(1000);
    System.out.print(records.count() + "; ");
}
Run Code Online (Sandbox Code Playgroud)

我看到在每次迭代中,消费者都不会收到帖子。它看起来像这样:

1; 1; 1; 0; 0; 30; 70; 1; 1; 21; 16; 2; 1; 1; 8; 49; 2; 1; 62; 35; 5; 11; 47; 2; 1; 1; 1; 1; 31; 1; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 0; 等等。

可能需要很长时间(5 分钟以上)或几秒钟,但我每次都有相同的结果。

当我重新启动消费者时,历史会重演。我确信消息会继续以每秒 100 条的速度到达主题。

有谁有想法吗?

更新

顺便说一下,如果我在这个页面上使用像描述这样的高级消费者,这个消费者有和以前一样的问题,但是直到我没有重新启动 kafka 服务器,它才从主题中获取消息。

如果我重新启动他,而不是服务器,第一个消费者(简单)会继续接收消息。

如果我使用 subscribe() 方法,如果我希望消费者收到消息,我必须重新启动 kafka 服务器。如果我使用assign() 方法,我必须只重启我的消费者,它会在一段时间内接收数据。

更新 2

关于这一点的更多数据。
如果我像这样设置消费者配置:consumer.seekToBeginning(ltp);
我的消费者在到达偏移量结束之前一直没有任何问题地接收消息。然后消费者更难接收消息,直到它停止。

如果我设置了这个设置 consumer.seekToEnd(ltp);消费者开始时最初几秒钟接收消息没有问题,然后逐渐停止。

可能是与消息偏移有关的任何问题?

更新 3

这是我对@fhussonnois 评论的回答。

抱歉,但我的英语水平不允许我即时阅读 Javadoc。如果我正确理解了这种方法的描述,poll(Long.MAX_VALUE) 使消费者继续等待 292 千年,而我们知道 Kafka 集群每秒接收 100 条数据。

现在我创建了我的脏修复并在测试中启动了它。它看起来像这样:

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class MyKC{

    private Properties config;
    private KafkaConsumer<String, String> consumer;
    private TopicPartition tp;
    private List<TopicPartition> ltp;
    private ConsumerRecords<String, String> records;
    private long offset = 0;

    public MyKC(){
        config = new Properties();
        config.put("zookeeper.connect", "localhost:2181");
        config.put("group.id", "default");
        config.put("bootstrap.servers", "localhost:9092");
        config.put("enable.auto.commit", "true");
        config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<String, String>(config);

        tp = new TopicPartition("connect-test", 0);
        ltp = Arrays.asList(tp);
        consumer.assign(ltp);
        consumer.seekToEnd(ltp);

        consume();
    }

    private void newConsumer(long offset){
        consumer = new KafkaConsumer<String, String>(config);
        consumer.assign(ltp);
        consumer.seek(tp, offset);
        consume();
    }

    private void restart(){
        offset = consumer.endOffsets(ltp).get(tp);
        consumer.close();
        consumer = null;
        newConsumer(offset);
    }

    public void consume(){

        long time = System.currentTimeMillis();

        while (true) {
            records = consumer.poll(1000);

            if (records.count() != 0){
                time = System.currentTimeMillis();
                for (ConsumerRecord<String, String> record : records){
                    System.out.println(record.value());
                }
            } else {
                if ((System.currentTimeMillis() - time) >= 30000){
                    restart();
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

在这一刻,它工作了 2 个小时,并重新启动了 14 次。

在我取消这个测试之后,我会尝试你的方式。:)