小编her*_*sen的帖子

KafkaConsumer:`seekToEnd()`不会让消费者从最新的偏移量开始消费

我有以下代码

class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {

    fun run() {
        consumer.seekToEnd(emptyList())
        val pollDuration = 30 // seconds

        while (true) {
            val records = consumer.poll(Duration.ofSeconds(pollDuration))
            // perform record analysis and commitSync()
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

消费者订阅的主题持续接收记录。有时,消费者会因处理步骤而崩溃。当消费者重新启动时,我希望它从主题的最新偏移量开始消费(即忽略消费者关闭时发布到主题的记录)。我认为该seekToEnd()方法可以确保这一点。然而,这个方法似乎根本没有效果。消费者从崩溃的偏移量开始消费。

正确的使用方法是什么seekToEnd()

编辑:使用以下配置创建消费者

fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
    val props = setupConfig(valueDeserializer)
    Common.setupConsumerSecurityProtocol(props)
    return createConsumer(props)
}

fun setupConfig(valueDeserializer: String): Properties {
    // Configuration setup
    val props = Properties()

    props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
    props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
    props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl …
Run Code Online (Sandbox Code Playgroud)

kotlin apache-kafka kafka-consumer-api

8
推荐指数
1
解决办法
3599
查看次数

标签 统计

apache-kafka ×1

kafka-consumer-api ×1

kotlin ×1