我有以下代码
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
consumer.seekToEnd(emptyList())
val pollDuration = 30 // seconds
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
消费者订阅的主题持续接收记录。有时,消费者会因处理步骤而崩溃。当消费者重新启动时,我希望它从主题的最新偏移量开始消费(即忽略消费者关闭时发布到主题的记录)。我认为该seekToEnd()方法可以确保这一点。然而,这个方法似乎根本没有效果。消费者从崩溃的偏移量开始消费。
正确的使用方法是什么seekToEnd()?
编辑:使用以下配置创建消费者
fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
val props = setupConfig(valueDeserializer)
Common.setupConsumerSecurityProtocol(props)
return createConsumer(props)
}
fun setupConfig(valueDeserializer: String): Properties {
// Configuration setup
val props = Properties()
props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl …Run Code Online (Sandbox Code Playgroud)