Spring Cloud AWS (1.0.0.RC2) 中 SimpleMessageListenerContainer 类的当前实现似乎会在消息处理程序完成消息处理并且方法调用返回后自动删除消息。
在我们的应用程序中,我们需要能够处理消息并等待下游队列的异步确认,然后再从 SQS 上游队列中删除消息。就像是
接收SQS消息 -> 处理消息 -> 发布消息到RabbitMQ(线程在此完成)
删除 SQS 消息 <- 我们的应用 <- RabbitMQ 消息成功 Ack(异步)
由于消息确认通过不同的线程异步返回,因此我们需要在检查成功确认后手动从 SQS 删除消息的选项。
理想情况下,SimpleMessageListener 应可配置其运行模式(自动删除或手动删除)。
我们非常希望使用 spring aws 云库(而不是推出我们自己的)来与 SQS 集成,因为它已经负责侦听器容器 bean 生命周期管理。
请告诉我上述建议的功能是否可行,如果可行,何时可以实施和发布。
谢谢。
我们可以再添加一个标志(除了已经存在的deleteMessageOnException标志之外),以完全禁用消息的自动删除,即使在成功处理的情况下也是如此。我看到的问题是有害消息不再被处理并且可能会破坏队列。我在这里为此创建了一个问题。
你的方法还会遇到另一个问题。如果消息没有足够快地删除(基于可见性超时),它将再次出现在您的处理程序方法中。
接收SQS msg1 -> 处理msg1 -> 发布msg1到RabbitMQ(线程在此完成)
接收SQS msg2 -> 处理msg2 -> 发布msg2到RabbitMQ(线程在此完成)
接收SQS msg1 -> 处理msg1 -> 发布msg1到RabbitMQ msg1又来了,因为没有删除
删除 SQS msg1 <- 我们的应用 <- RabbitMQ Msg1 成功 Ack(异步)
目前一个非常丑陋的解决方法可能是在处理程序方法中抛出异常并将deleteMessageOnException标志设置为false。因此,不会删除任何消息,您可以获取收据句柄(使用@Header或@Headers)来手动删除它们。
编辑
该问题现已解决,人们可以直接使用@SqsListener
注释定义删除策略并使用注入的 Acknowledgment 对象。查看此问题的最后评论
归档时间: |
|
查看次数: |
3619 次 |
最近记录: |