dam*_*fie 7 java kafka-consumer-api reactive-kafka spring-kafka reactor-kafka
我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例:https : //github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java
看起来反应式 kafka 中尚不支持 Spring
我了解 kafka 侦听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧
但是我现在不确定如何在 Spring 中正确使用反应式 kafka。
基本上我需要一个话题的听众。我应该创建某种循环或我自己的调度程序吗?或者也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?
Paw*_*nar -1
我还没有现成的解决方案,但我正在尝试这个(Kotlin 代码,Spring Boot)。有人在这里发布了部分代码片段https://github.com/reactor/reactor-kafka/issues/100
@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
kafkaReceiver
.receive()
.doOnNext { record ->
val myEvent = record.value()
processMyEvent(myEvent).thenEmpty {
record.receiverOffset().acknowledge()
}
}
.doOnError {
/* todo */
}
.subscribe()
}
Run Code Online (Sandbox Code Playgroud)
查看其他堆栈溢出问题。内容不多,但也许能给你一些想法