在反应式场景中什么应该更好:Reactor Kafka 或 Kafka Listener

Jim*_*m C 5 web-component server-sent-events apache-kafka project-reactor spring-webflux

我有一个挑战我的场景:Kafka 主题,我必须使用这些消息并通过 SSE 公开给 Web 组件。我已经在所有层中进行了几次尝试,以寻找一种稳定且更可靠的方法,或者至少有一些我可以更舒适地支持。

现在我创建了一个非常简单的 Kafka 主题并创建了两个不同的使用者,两者的工作方式似乎非常相似。一种是使用 org.springframework.kafka.annotation.KafkaListener 和其他 reactor.kafka.receiver.KafkaReceiver。

最终目标是通过 Spring WebFlux 公开一个事件,当消费者来自主题时发布消息。

如果我没记错的话,我在某处读到 Spring..KafkaListener 正在阻止代码,但据我所知,它不是。它只是一个与 Reactor..KafkaReceiver 完全相同的监听器。因为我正在编写一个非阻塞代码,所以我应该避免阻塞代码,但我不能在任何地方使用 Spring..KafkaListener 阻塞。

以下是产生完全相同结果的基本比较:

反应堆KafkaReceiver:

ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
        .subscription(Collections.singleton("test"))
        .addAssignListener(partitions -> logger.debug("onPartitionsAssigned {}", partitions))
        .addRevokeListener(partitions -> logger.debug("onPartitionsRevoked {}", partitions));

kafkaReceiver = KafkaReceiver.create(consumerOptions);

((Flux<ReceiverRecord>) kafkaReceiver.receive()).doOnNext(r -> {
    logger.info(String.format("Consumed Message using KafkaListener -> %s", r.value()));
    r.receiverOffset().acknowledge();
}).subscribe();
Run Code Online (Sandbox Code Playgroud)

春季卡夫卡听众:

@KafkaListener(topics = "test")
public void consume(String message) {
    logger.info(String.format("Consumed Message using KafkaListener -> %s", message));
}
Run Code Online (Sandbox Code Playgroud)

如果要重现比较:

1 - clone https://github.com/jimisdrpc/simplest-comparison-kafkaconsumer
2 - create default Kafka topic name test
3 - produce any message to such topic
4 - enable either Reactor Kafka and run again after enable Kafka Listener (I didn't find how make both work in same application at same moment but let's say it isn't that important now)
Run Code Online (Sandbox Code Playgroud)

PS:我不是问哪个更好。我正在尝试比较一个非常具体的场景(反应式编程旨在通过 Spring WebFlux 为 webcomponent 生成事件),并试图保证我没有选择可能以某种方式阻塞代码的场景。