在 Spring Boot 应用程序中实现 Reactive Kafka Listener

dam*_*fie 7 java kafka-consumer-api reactive-kafka spring-kafka reactor-kafka

我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例:https : //github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java

看起来反应式 kafka 中尚不支持 Spring

我了解 kafka 侦听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧

但是我现在不确定如何在 Spring 中正确使用反应式 kafka。

基本上我需要一个话题的听众。我应该创建某种循环或我自己的调度程序吗?或者也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?

Paw*_*nar -1

我还没有现成的解决方案,但我正在尝试这个(Kotlin 代码,Spring Boot)。有人在这里发布了部分代码片段https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}
Run Code Online (Sandbox Code Playgroud)

查看其他堆栈溢出问题。内容不多,但也许能给你一些想法