Mik*_*dan 5 java spring spring-integration project-reactor
来自发行说明(https://spring.io/blog/2017/11/29/spring-integration-5-0-ga-available):
- 通过 FluxMessageChannel、ReactiveStreamsConsumer 和 AbstractMessageHandler 中的直接 org.reactivestreams.Subscriber 实现来支持 Reactive Streams;
我对 Reactor 支持的理解是,例如,您可以从变压器/处理程序返回 Mono/Flux,Spring Integration 会自动将其转换为消息,同时尊重背压。不幸的是,我不能让它像那样工作,例如:
IntegrationFlows.from("input")
.handle((p, h) -> Flux.just(1, 2, 3))
.log("l1")
.channel("output")
.get();
Run Code Online (Sandbox Code Playgroud)
仍然记录一条带有 FluxArray 类型有效负载的消息,而不是三条带有整数有效负载的消息。
2017-12-18 17:12:33.262 INFO 97471 --- [nio-8080-exec-1] l1 : GenericMessage [payload=FluxArray, headers={id=a9701681-9945-f953-8b72-df369c2982a3, timestamp=1513613553262}]
Run Code Online (Sandbox Code Playgroud)
此外,文档中没有任何内容根据这种行为和新的
FluxMessageChannel、ReactiveStreamsConsumer 和 AbstractMessageHandler 中的直接 org.reactivestreams.Subscriber 实现
所以我的问题是,我是否正确理解了已实现的 Reactor 支持,以及在哪里可以找到有关该主题的任何信息?
由于我们在消息传递中,从服务返回什么样的有效负载对于消息来说并不重要,所以所有内容都按原样包装Message。您需要一个特殊的组件来理解此有效负载。其中之一是Splitter。这确定您的有效负载是反应性流Publisher,并作为Flux.
另一个组件WebFluxInboundEndpoint本身支持这种类型的有效负载。
您的自定义服务激活器可能需要Flux作为参数来处理。
但没有什么是自动发生的。Spring Integration 支持响应式类型,但不会在没有最终用户首选项的情况下进行处理。
顺便说一句,splitter应该提供作为输出来通过背压方式FluxMessageChannel处理分裂。Flux
欢迎向 A JIRA 提出有关文档的问题FluxMessageChannel。事实上我们已经错过了这一点。也ReactiveStreamsConsumer需要更多的爱,我们有一些5.1改进反应流模型的计划,我们将尝试使其更加灵活,甚至喜欢默认打开它的选项。但从今天起,一切都无法保证。
| 归档时间: |
|
| 查看次数: |
1293 次 |
| 最近记录: |