Tob*_*ann 3 spring kotlin apache-kafka spring-boot spring-kafka
当批量消费 Kafka 消息时,可以使用 限制批量大小max.poll.records。
如果消费者非常快并且其提交偏移量没有明显滞后,这意味着大多数批次将会小得多。我只想接收“完整”批次,即只有在达到批次大小后才调用我的消费者函数。所以我正在寻找类似 的东西min.poll.records,它不以这种形式存在。
这是我正在做的一个最小的例子:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
}
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
)
fun batchListen(values: List<ConsumerRecord<String, String>>) {
println(values.count())
}
}
Run Code Online (Sandbox Code Playgroud)
当以一点消费者滞后开始时,它会输出如下内容:
[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]
Run Code Online (Sandbox Code Playgroud)
有没有什么方法(sleep在“不完整”批次的情况下无需在消费者处理程序中手动添加)在满足以下两个条件之一时调用该函数?- 仅当至少有n消息存在时 - 或者至少m等待了几毫秒
| 归档时间: |
|
| 查看次数: |
3929 次 |
| 最近记录: |