如何在聚合过滤后重新排序/Spring Integration/

Fer*_*uri 1 integration filtering spring-integration aggregation

我正在做一个 Spring Integration 项目,我遇到了一个大问题。流程中有一些过滤组件,稍后在流程中我有一个聚合元素。

问题是过滤组件不支持“apply-sequence”属性。它过滤掉一些记录,但不修改原始序列号,但减少了消息数量。在流程的稍后部分,我需要一个无法释放元素的聚合,因为某些消息被过滤掉了。

我不想使用任何具有 apply-sequence 属性的特殊路由元素。您能为我建议此类过滤问题的通用解决方案吗?

谢谢,

Art*_*lan 5

我想说你误解了filter和的行为aggregator

我猜你apply-sequence在上游有一些感知组件。因此,该组中的所有消息都接受多个标头 -correlationId以默认对消息进行分组 aggregatorsequenceNumber-index消息的名称;sequenceSize- 组中的消息数。

Filter只是检查消息的某些条件并将其发送到outpu-channelor doesdiscard逻辑。它不会修改消息。然而,即使我们能做到这一点,听起来也不太好。

假设组中只有两条消息。第一个可以进行过滤 - 我们只需将其发送到aggregator. 但第二个被丢弃,并且,是的,它不会被发送到聚合器。最后一个永远不会释放该组,因为sequenceSize尚未达到。

为了满足您的要求,您需要进行一些自定义(默认情况ReleaseStrategyaggregatorSequenceSizeReleaseStrategy)。例如,检查系统中的某些状态,即组中的所有消息均已独立于 发送truefalse在 后产生filter。或者出于同样的原因有一些fake消息并检查其在组中的可用性。

在这种情况下,您只需要注意correlationId在聚合器中对消息进行分组即可。

更新

对于这种情况,建议的发布策略是什么?使用超时作为发布策略是一个好的策略吗?

我可以说,有时对于某些集成场景找到好的解决方案确实很困难。消息传递是stateless本质上的,因此对不确定数量的消息进行关联和分组可能是一个问题。

需要看需求和环境。

例如,当您的所有消息都在单个线程中处理时,您可以安全地将一些fake标记消息直接发送到最后aggregator并从中检查它ReleaseStrategy。即使您来自该组的所有消息都可能被丢弃,它仍然可以工作。

如果您并行处理这些消息或者它们是从不同线程接收的,您实际上将无法确定消息的顺序以及每个进程的时间。

在这种情况下,TimeoutCountSequenceSizeReleaseStrategy确实可以提供帮助。当然,需要根据系统的要求找到合适的时间范围折衷方案。