Spring Integration 5.0 Reactor 类型支持

Mik*_*dan 5 java spring spring-integration project-reactor

来自发行说明(https://spring.io/blog/2017/11/29/spring-integration-5-0-ga-available):

  • 通过 FluxMessageChannel、ReactiveStreamsConsumer 和 AbstractMessageHandler 中的直接 org.reactivestreams.Subscriber 实现来支持 Reactive Streams;

我对 Reactor 支持的理解是,例如,您可以从变压器/处理程序返回 Mono/Flux,Spring Integration 会自动将其转换为消息,同时尊重背压。不幸的是,我不能让它像那样工作,例如:

IntegrationFlows.from("input")
                .handle((p, h) -> Flux.just(1, 2, 3))
                .log("l1")
                .channel("output")
                .get();
Run Code Online (Sandbox Code Playgroud)

仍然记录一条带有 FluxArray 类型有效负载的消息,而不是三条带有整数有效负载的消息。

2017-12-18 17:12:33.262  INFO 97471 --- [nio-8080-exec-1] l1 : GenericMessage [payload=FluxArray, headers={id=a9701681-9945-f953-8b72-df369c2982a3, timestamp=1513613553262}]
Run Code Online (Sandbox Code Playgroud)

此外,文档中没有任何内容根据这种行为和新的

FluxMessageChannel、ReactiveStreamsConsumer 和 AbstractMessageHandler 中的直接 org.reactivestreams.Subscriber 实现

所以我的问题是,我是否正确理解了已实现的 Reactor 支持,以及在哪里可以找到有关该主题的任何信息?

Art*_*lan 4

由于我们在消息传递中,从服务返回什么样的有效负载对于消息来说并不重要,所以所有内容都按原样包装Message。您需要一个特殊的组件来理解此有效负载。其中之一是Splitter。这确定您的有效负载是反应性流Publisher,并作为Flux.

另一个组件WebFluxInboundEndpoint本身支持这种类型的有效负载。

您的自定义服务激活器可能需要Flux作为参数来处理。

但没有什么是自动发生的。Spring Integration 支持响应式类型,但不会在没有最终用户首选项的情况下进行处理。

顺便说一句,splitter应该提供作为输出来通过背压方式FluxMessageChannel处理分裂。Flux

欢迎向 A JIRA 提出有关文档的问题FluxMessageChannel。事实上我们已经错过了这一点。也ReactiveStreamsConsumer需要更多的爱,我们有一些5.1改进反应流模型的计划,我们将尝试使其更加灵活,甚至喜欢默认打开它的选项。但从今天起,一切都无法保证。