Dra*_*vic 5 java spring-integration
我正在努力工作几天才能完成这项工作.我想要完成的是从主流中调用不同的子流(即Integration流),基于消息内容和子流完成后返回主流.它就像委托对特定类的责任来完成某些事情并返回主流程.这个责任也需要一些步骤,所以它也作为流程实现.这是我的主要流程:
public IntegrationFlow processingFlow(
MessageChannel eventIn,
MessageChannel eventOut,
ChangedEventsLoader changedEventsLoader,
CalculatorRouter calculatorRouter) {
return IntegrationFlows.from(eventIn)
.handle(changedEventsLoader)
.route(
CalculatorRouter::getSportId,
CalculatorRouter::routeCalculation)
.channel(eventOut)
.get();
Run Code Online (Sandbox Code Playgroud)
}
这是路由器的实现:
@Service
@AllArgsConstructor
public class CalculatorRouter {
private final MessageChannel eventOut;
public RouterSpec<Integer, MethodInvokingRouter> routeCalculation(
RouterSpec<Integer, MethodInvokingRouter> mapping) {
return mapping
.channelMapping(1, "subflowCalculationChannel")
.defaultOutputToParentFlow();
}
public Integer getSportId(Event event) {
return 1;
}
@Bean
public MessageChannel subflowCalculationChannel() {
return MessageChannels.direct().get();
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个子流程的示例:
@Configuration
@AllArgsConstructor
public class CalculatorExample {
@Bean
public IntegrationFlow calculateProbabilities(MessageChannel subflowCalculationChannel) {
return IntegrationFlows.from(subflowCalculationChannel)
.<Event>handle((p, m) -> p * 2)
.get();
}
}
Run Code Online (Sandbox Code Playgroud)
问题是子流错过了与主流的某些连接.我试图通过在路由部分中使用defaultOutputToParentFlow()来解决这个问题,但这还不够.
从某个版本开始,我们决定将 Java DSL 路由器行为与带有注释或 XML 的标准配置保持一致。因此,如果我们发送到路由器,我们就不能指望从那里得到回复。我们只能继续使用通道作为子流的输出。
在你的情况下,你有一个.channel(eventOut)主流。因此,您的所有路由子流都应准确回复此通道:
.<Event>handle((p, m) -> corners1H2HCustomBet.getCalculation(p))
.channel(eventOut)
.get();
Run Code Online (Sandbox Code Playgroud)
我认为.defaultOutputToParentFlow();这对你没有任何帮助,因为你没有默认映射。这已经是一个稍微不同的故事了:它对其他映射没有任何影响。
另请注意此 JavaDoc:
/**
* Add a subflow as an alternative to a {@link #channelMapping(Object, String)}.
* {@link #prefix(String)} and {@link #suffix(String)} cannot be used when subflow
* mappings are used.
* <p> If subflow should refer to the external {@link IntegrationFlow} bean and
* there is a requirement to expect reply from there, such a reference should be
* wrapped with a {@code .gateway()}:
* <pre class="code">
* {@code
* .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
* }
* </pre>
* @param key the key.
* @param subFlow the subFlow.
* @return the router spec.
*/
public RouterSpec<K, R> subFlowMapping(K key, IntegrationFlow subFlow) {
Run Code Online (Sandbox Code Playgroud)
与基于通道的路由配置无关,但将来可能有用。
更新
subFlowMapping这是一个返回主流程的示例(Kotlin) :
@Bean
fun splitRouteAggregate() =
IntegrationFlow { f ->
f.split()
.route<Int, Boolean>({ o -> o % 2 == 0 },
{ m ->
m.subFlowMapping(true) { sf -> sf.gateway(oddFlow()) }
.subFlowMapping(false) { sf -> sf.gateway(evenFlow()) }
})
.aggregate()
}
@Bean
fun oddFlow() =
IntegrationFlow { flow ->
flow.handle<Any> { _, _ -> "odd" }
}
@Bean
fun evenFlow() =
IntegrationFlow { flow ->
flow.handle<Any> { _, _ -> "even" }
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
428 次 |
| 最近记录: |