Edw*_*rzo 5 amqp rabbitmq spring-amqp spring-rabbitmq
我试图了解在分布式数据库事务的上下文中处理RabbitMQ交付的正确模式是什么.
为了简单起见,我将用伪代码来说明我的想法,但实际上我正在使用Spring AMQP来实现这些想法.
什么都喜欢
void foo(message) {
processMessageInDatabaseTransaction(message);
sendMessageToRabbitMQ(message);
}
Run Code Online (Sandbox Code Playgroud)
凡时候,我们到达sendMessageToRabbitMQ()了processMessageInDatabaseTransaction()已经成功提交了对数据库的更改,或到达消息发送代码之前的异常被抛出.
我知道,因为sendMessageToRabbitMQ()我可以使用Rabbit事务或发布者确认,以保证Rabbit得到我的消息.
我感兴趣的是理解当事情向南发生时会发生什么,即当数据库事务成功时,但确认在一定时间后(发布者确认)或Rabbit事务未能提交(使用Rabbit事务)时未到达.
一旦发生这种情况,保证传递信息的正确模式是什么?
当然,在开发了幂等消费者之后,我认为我可以重新发送消息,直到Rabbit确认成功为止:
void foo(message) {
processMessageInDatabaseTransaction(message);
retryUntilSuccessFull {
sendMessagesToRabbitMQ(message);
}
}
Run Code Online (Sandbox Code Playgroud)
但是这种模式有一些我不喜欢的缺点,首先,如果故障延长,我的线程将开始阻塞,我的系统最终会变得无法响应.其次,如果我的系统崩溃或关闭会发生什么?我永远不会传递这些消息,因为它们会丢失.
所以,我想,好吧,我将首先将我的消息写入数据库,处于挂起状态,然后从那里发布我的待处理消息:
void foo(message) {
//transaction commits leaving message in pending status
processMessageInDatabaseTransaction(message);
}
@Poller(every="10 seconds")
void bar() {
for(message in readPendingMessagesFromDbStore()) {
sendPendingMessageToRabbitMQ(message);
if(confirmed) {
acknowledgeMessageInDatabase(message);
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果我无法确认数据库中的消息,可能会多次发送消息.
但现在我已经介绍了其他问题:
然后我想好了,我可以让它变得更复杂,我可以从数据库发布直到我赶上事件的实时流然后发布实时,即保持大小为b的缓冲区(循环缓冲区)为我根据页面检查该消息是否在缓冲区中.如果是,则切换到实时订阅.
到目前为止,我意识到如何做到这一点并不是很明显,所以我得出结论,我需要了解解决这个问题的正确模式是什么.
那么,有没有人建议正确的方法是什么?
虽然 RabbitMQ 无法参与真正的全局 (XA) 事务,但您可以使用 Spring 事务管理来同步数据库事务与 Rabbit 事务,这样如果任一更新失败,两个事务都将回滚。有一个(非常)小的时间漏洞,其中一个可能会提交,但另一个不会,所以你确实需要处理这种可能性。
有关更多详细信息,请参阅Dave Syer 的 Javaworld 文章。
| 归档时间: |
|
| 查看次数: |
1108 次 |
| 最近记录: |