如何使用 Spring Boot 等待完整的 Kafka 消息批处理?

Tob*_*ann 3 spring kotlin apache-kafka spring-boot spring-kafka

当批量消费 Kafka 消息时,可以使用 限制批量大小max.poll.records

如果消费者非常快并且其提交偏移量没有明显滞后,这意味着大多数批次将会小得多。我只想接收“完整”批次,即只有在达到批次大小后才调用我的消费者函数。所以我正在寻找类似 的东西min.poll.records,它不以这种形式存在。

这是我正在做的一个最小的例子:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @Bean
    fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
        factory.isBatchListener = true
        return factory
    }

    @KafkaListener(
        topics = ["myTopic"],
        containerFactory = "kafkaBatchListenerContainerFactory"
    )
    fun batchListen(values: List<ConsumerRecord<String, String>>) {
        println(values.count())
    }
}
Run Code Online (Sandbox Code Playgroud)

当以一点消费者滞后开始时,它会输出如下内容:

[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]
Run Code Online (Sandbox Code Playgroud)

有没有什么方法(sleep在“不完整”批次的情况下无需在消费者处理程序中手动添加)在满足以下两个条件之一时调用该函数?- 仅当至少有n消息存在时 - 或者至少m等待了几毫秒

Gar*_*ell 6

卡夫卡没有min.poll.recordsfetch.min.bytes如果您的记录长度相似,您可以使用近似值。另见fetch.max.wait.ms