alo*_*rey 3 spring-amqp project-reactor
我有一个使用Boot 2.0和webflux的应用程序,并且有一个端点返回Flux of ServerSentEvent.通过利用spring-amqp从RabbitMQ队列中消耗消息来创建事件.我的问题是:如何最好地将已MessageListener配置的侦听器方法桥接到可以传递给控制器的Flux?
Project Reactor的create部分提到"将现有的API与反应世界联系起来非常有用 - 例如基于侦听器的异步API",但我不确定如何直接挂钩消息监听器,因为它包含在DirectMessageListenerContainer和MessageListenerAdapter.他们的例子来自创建部分:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
Run Code Online (Sandbox Code Playgroud)
到目前为止,我最好的选择是创建一个Processor并且onNext()每次在RabbitMQ监听器方法中调用以手动生成事件.
我有这样的事情:
@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("foo", "event-" + i);
}
}
private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);
@GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSeeFromAmqp() {
return this.sseFluxProcessor;
}
@RabbitListener(id = "fooListener", queues = "foo")
public void handleAmqpMessages(String message) {
this.sseFluxProcessor.onNext(message);
}
}
Run Code Online (Sandbox Code Playgroud)
该TopicProcessor.share()允许有多个并发用户,我们得到当我们回到这个TopicProcessor作为Flux我们/sseFromAmqp通过WebFlux REST请求.
该@RabbitListener方法仅委派其收到的消息说TopicProcessor.
在main()我有一个代码,以确认我可以发布到TopicProcessor即使没有订阅者.
curl通过RabbitMQ Management Plugin 测试两个单独的会话并将消息发布到队列.
顺便说一句,share()因为我使用:https://projectreactor.io/docs/core/release/reference/#_topicprocessor
在共享配置中创建时,从多个上游发布者
那'因为那@RabbitListener真的可以同时从不同的ListenerContainer线程调用.
UPDATE
我还把这个样本移到我的Sandbox:https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux
| 归档时间: |
|
| 查看次数: |
1738 次 |
| 最近记录: |