Ehs*_*san 5 apache-kafka kafka-consumer-api
我希望有一个Kafka Consumer,它从一个主题中的最新消息开始.
这是java代码:
private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
properties.setProperty("bootstrap.servers","localhost");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "latest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("mytopic"));
}
@Override
public StreamHandler call() throws Exception
{
while (true)
{
ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
for(ConsumerRecord<String, String> rec : records)
{
System.out.println(rec.value());
}
}
}
Run Code Online (Sandbox Code Playgroud)
尽管auto.offset.reset的值是最新的,但是消费者会在2天前启动表单消息,然后赶上最新的消息.
我错过了什么?
您之前是否运行过相同的代码group.id?auto.offset.reset仅当您的消费者尚未存储现有偏移量时,才使用该参数.因此,如果您之前运行过该示例,比如说两天前,然后再次运行它,它将从上次消耗的位置开始.
使用seekToEnd(),如果你想手动去到主题的末尾.
有关详细信息,请参阅/sf/answers/2267452211/.
| 归档时间: |
|
| 查看次数: |
5405 次 |
| 最近记录: |