Spring集成路由到不同类的子流

Dra*_*vic 5 java spring-integration

我正在努力工作几天才能完成这项工作.我想要完成的是从主流中调用不同的子流(即Integration流),基于消息内容和子流完成后返回主流.它就像委托对特定类的责任来完成某些事情并返回主流程.这个责任也需要一些步骤,所以它也作为流程实现.这是我的主要流程:

public IntegrationFlow processingFlow(
  MessageChannel eventIn,
  MessageChannel eventOut,
  ChangedEventsLoader changedEventsLoader,
  CalculatorRouter calculatorRouter) {

return IntegrationFlows.from(eventIn)
    .handle(changedEventsLoader)
    .route(
        CalculatorRouter::getSportId,
        CalculatorRouter::routeCalculation)
    .channel(eventOut)
    .get();
Run Code Online (Sandbox Code Playgroud)

}

这是路由器的实现:

@Service
@AllArgsConstructor
public class CalculatorRouter {
  private final MessageChannel eventOut;

  public RouterSpec<Integer, MethodInvokingRouter> routeCalculation(
      RouterSpec<Integer, MethodInvokingRouter> mapping) {
    return mapping
        .channelMapping(1, "subflowCalculationChannel")
        .defaultOutputToParentFlow();
  }

  public Integer getSportId(Event event) {
    return 1;
  }

  @Bean
  public MessageChannel subflowCalculationChannel() {
    return MessageChannels.direct().get();
  }
}
Run Code Online (Sandbox Code Playgroud)

这是一个子流程的示例:

@Configuration
@AllArgsConstructor
public class CalculatorExample {

  @Bean
  public IntegrationFlow calculateProbabilities(MessageChannel subflowCalculationChannel) {
    return IntegrationFlows.from(subflowCalculationChannel)
        .<Event>handle((p, m) -> p * 2)
        .get();
  }
}
Run Code Online (Sandbox Code Playgroud)

问题是子流错过了与主流的某些连接.我试图通过在路由部分中使用defaultOutputToParentFlow()来解决这个问题,但这还不够.

Art*_*lan 2

从某个版本开始,我们决定将 Java DSL 路由器行为与带有注释或 XML 的标准配置保持一致。因此,如果我们发送到路由器,我们就不能指望从那里得到回复。我们只能继续使用通道作为子流的输出。

在你的情况下,你有一个.channel(eventOut)主流。因此,您的所有路由子流都应准确回复此通道:

    .<Event>handle((p, m) -> corners1H2HCustomBet.getCalculation(p))
    .channel(eventOut)
    .get();
Run Code Online (Sandbox Code Playgroud)

我认为.defaultOutputToParentFlow();这对你没有任何帮助,因为你没有默认映射。这已经是一个稍微不同的故事了:它对其他映射没有任何影响。

另请注意此 JavaDoc:

/**
 * Add a subflow as an alternative to a {@link #channelMapping(Object, String)}.
 * {@link #prefix(String)} and {@link #suffix(String)} cannot be used when subflow
 * mappings are used.
 * <p> If subflow should refer to the external {@link IntegrationFlow} bean and
 * there is a requirement to expect reply from there, such a reference should be
 * wrapped with a {@code .gateway()}:
 * <pre class="code">
 * {@code
 *     .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
 * }
 * </pre>
 * @param key the key.
 * @param subFlow the subFlow.
 * @return the router spec.
 */
public RouterSpec<K, R> subFlowMapping(K key, IntegrationFlow subFlow) {
Run Code Online (Sandbox Code Playgroud)

与基于通道的路由配置无关,但将来可能有用。

更新

subFlowMapping这是一个返回主流程的示例(Kotlin) :

    @Bean
    fun splitRouteAggregate() =
            IntegrationFlow { f ->
                f.split()
                        .route<Int, Boolean>({ o -> o % 2 == 0 },
                                { m ->
                                    m.subFlowMapping(true) { sf -> sf.gateway(oddFlow()) }
                                            .subFlowMapping(false) { sf -> sf.gateway(evenFlow()) }
                                })
                        .aggregate()
            }

    @Bean
    fun oddFlow() =
            IntegrationFlow { flow ->
                flow.handle<Any> { _, _ -> "odd" }
            }

    @Bean
    fun evenFlow() =
            IntegrationFlow { flow ->
                flow.handle<Any> { _, _ -> "even" }
            }
Run Code Online (Sandbox Code Playgroud)