如何使用 Spring EventListener 进行 sendAndReceive ?

Jan*_*ski 5 spring event-handling

通过project-reactor 中的(同时已弃用的)reactor-bus,我有了 API eventBus.sendAndReceive(Event e, Consumer<?> callback)

这允许通过发布事件来触发执行并自动订阅响应。

使用 Spring eventListeners,我可以从 EventListener 方法发布另一个事件,但我缺少直接订阅返回值的功能。

如何使用 spring 实现相同的行为?如何以编程方式注册/取消注册侦听器以及如何使主题动态化?

Dov*_*vmo 5

使用 Spring 时,ApplicationEventMulticaster您无法订阅响应。您可能注意到该onApplicationEvent方法返回void!这样做的原因是因为它实际上所做的只是ApplicationListener同步调用订阅者(即 ),或者在执行器上异步运行侦听器方法而不返回任何类型的Future.

Spring 的Reactor 项目不久前得到了发展,以更紧密地匹配Reactive Manifesto和类似框架(如 RxJava)。现在使用 Spring 5(Reactor 默认附带),您可以互换使用 Reactor 和 RxJava。

既然如此,针对您的问题:

如何使用 spring 实现相同的行为?

您使用新版本的Reactor CoreFlux以及、Mono等的函数式编程特性。

使用 Spring eventListeners,我可以从 EventListener 方法发布另一个事件,但我缺少直接订阅返回值的功能。

如果您查看 的 APIFlux,您会发现它具有流畅且功能齐全的 API(在某些方面类似于 Java 8 流)。

Flux.just(1, 2, 3, 4)
  .map(value -> value + 1)
  .subscribe(subscriber::function);
Run Code Online (Sandbox Code Playgroud)

这样,您可以对“事件”(即本例中的 1,2,3,4)进行操作,对这些事件的“返回值”执行操作,然后将它们通过管道传递给某些订阅者操作消耗这些事件。

如何以编程方式注册/取消注册侦听器以及如何使主题动态化?

你应该看看这个答案。要注册/取消注册,您可以使用 Reactor 框架中所谓的“完成器”来完成此操作。请参阅Reactor API 中的take函数。他们将向上游发出信号,表明他们基本上希望取消订阅,并且上游生产者应该停止发射。