在spring-webflux中使用Spring AMQP使用者

alo*_*rey 3 spring-amqp project-reactor

我有一个使用Boot 2.0和webflux的应用程序,并且有一个端点返回Flux of ServerSentEvent.通过利用spring-amqp从RabbitMQ队列中消耗消息来创建事件.我的问题是:如何最好地将已MessageListener配置的侦听器方法桥接到可以传递给控制器​​的Flux?

Project Reactor的create部分提到"将现有的API与反应世界联系起来非常有用 - 例如基于侦听器的异步API",但我不确定如何直接挂钩消息监听器,因为它包含在DirectMessageListenerContainerMessageListenerAdapter.他们的例子来自创建部分:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});
Run Code Online (Sandbox Code Playgroud)

到目前为止,我最好的选择是创建一个Processor并且onNext()每次在RabbitMQ监听器方法中调用以手动生成事件.

Art*_*lan 6

我有这样的事情:

@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);

        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("foo", "event-" + i);
        }

    }

    private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);

    @GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getSeeFromAmqp() {
        return this.sseFluxProcessor;
    }

    @RabbitListener(id = "fooListener", queues = "foo")
    public void handleAmqpMessages(String message) {
        this.sseFluxProcessor.onNext(message);
    }

}
Run Code Online (Sandbox Code Playgroud)

TopicProcessor.share()允许有多个并发用户,我们得到当我们回到这个TopicProcessor作为Flux我们/sseFromAmqp通过WebFlux REST请求.

@RabbitListener方法仅委派其收到的消息说TopicProcessor.

main()我有一个代码,以确认我可以发布到TopicProcessor即使没有订阅者.

curl通过RabbitMQ Management Plugin 测试两个单独的会话并将消息发布到队列.

顺便说一句,share()因为我使用:https://projectreactor.io/docs/core/release/reference/#_topicprocessor

在共享配置中创建时,从多个上游发布者

那'因为那@RabbitListener真的可以同时从不同的ListenerContainer线程调用.

UPDATE

我还把这个样本移到我的Sandbox:https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux

  • 对,我做到了.这是Reactor团队的推荐.他们将在下一个版本中删除/替换`Topic/WorkQueue`.为了正确的解决方案,我们必须等待Reactor RabbitMQ项目:https://github.com/reactor/reactor-rabbitmq (2认同)