Kafka消费者不会从最新消息开始

Ehs*_*san 5 apache-kafka kafka-consumer-api

我希望有一个Kafka Consumer,它从一个主题中的最新消息开始.

这是java代码:

private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
    properties.setProperty("bootstrap.servers","localhost");
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "latest");
    consumer = new KafkaConsumer<>(properties);

    consumer.subscribe(Collections.singletonList("mytopic"));
}

@Override
public StreamHandler call() throws Exception
{
    while (true) 
    {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
        Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
        for(ConsumerRecord<String, String> rec : records)
        {
            System.out.println(rec.value());
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

尽管auto.offset.reset的值是最新的,但是消费者会在2天前启动表单消息,然后赶上最新的消息.

我错过了什么?

Bri*_*ker 8

您之前是否运行过相同的代码group.idauto.offset.reset仅当您的消费者尚未存储现有偏移量时,才使用该参数.因此,如果您之前运行过该示例,比如说两天前,然后再次运行它,它将从上次消耗的位置开始.

使用seekToEnd(),如果你想手动去到主题的末尾.

有关详细信息,请参阅/sf/answers/2267452211/.