分布式数据库事务中的RabbitMQ和交付保证

Edw*_*rzo 5 amqp rabbitmq spring-amqp spring-rabbitmq

我试图了解在分布式数据库事务的上下文中处理RabbitMQ交付的正确模式是什么.

为了简单起见,我将用伪代码来说明我的想法,但实际上我正在使用Spring AMQP来实现这些想法.

什么都喜欢

void foo(message) {
   processMessageInDatabaseTransaction(message);
   sendMessageToRabbitMQ(message);
}
Run Code Online (Sandbox Code Playgroud)

凡时候,我们到达sendMessageToRabbitMQ()processMessageInDatabaseTransaction()已经成功提交了对数据库的更改,或到达消息发送代码之前的异常被抛出.

我知道,因为sendMessageToRabbitMQ()我可以使用Rabbit事务发布者确认,以保证Rabbit得到我的消息.

我感兴趣的是理解当事情向南发生时会发生什么,即当数据库事务成功时,但确认在一定时间后(发布者确认)或Rabbit事务未能提交(使用Rabbit事务)时未到达.

一旦发生这种情况,保证传递信息的正确模式是什么?

当然,在开发了幂等消费者之后,我认为我可以重新发送消息,直到Rabbit确认成功为止:

void foo(message) {
   processMessageInDatabaseTransaction(message);
   retryUntilSuccessFull {
      sendMessagesToRabbitMQ(message);
   }
}
Run Code Online (Sandbox Code Playgroud)

但是这种模式有一些我不喜欢的缺点,首先,如果故障延长,我的线程将开始阻塞,我的系统最终会变得无法响应.其次,如果我的系统崩溃或关闭会发生什么?我永远不会传递这些消息,因为它们会丢失.

所以,我想,好吧,我将首先将我的消息写入数据库,处于挂起状态,然后从那里发布我的待处理消息:

void foo(message) {
   //transaction commits leaving message in pending status
   processMessageInDatabaseTransaction(message);
}

@Poller(every="10 seconds")
void bar() {
   for(message in readPendingMessagesFromDbStore()) {
      sendPendingMessageToRabbitMQ(message);
      if(confirmed) {
          acknowledgeMessageInDatabase(message); 
      }
   }
}
Run Code Online (Sandbox Code Playgroud)

如果我无法确认数据库中的消息,可能会多次发送消息.

但现在我已经介绍了其他问题:

  • 需要从数据库执行I/O以发布消息,即99%的时间可以立即成功发布,而无需检查数据库.
  • 由于现在我增加了消息发布的延迟,因此难以使轮询器更接近实时传送.
  • 也许还有其他一些复杂因素,例如保证事件按顺序传递,轮询执行器相互衔接,多个轮询器等等.

然后我想好了,我可以让它变得更复杂,我可以从数据库发布直到我赶上事件的实时流然后发布实时,即保持大小为b的缓冲区(循环缓冲区)为我根据页面检查该消息是否在缓冲区中.如果是,则切换到实时订阅.

到目前为止,我意识到如何做到这一点并不是很明显,所以我得出结论,我需要了解解决这个问题的正确模式是什么.

那么,有没有人建议正确的方法是什么?

Gar*_*ell 2

虽然 RabbitMQ 无法参与真正的全局 (XA) 事务,但您可以使用 Spring 事务管理来同步数据库事务与 Rabbit 事务,这样如果任一更新失败,两个事务都将回滚。有一个(非常)小的时间漏洞,其中一个可能会提交,但另一个不会,所以你确实需要处理这种可能性。

有关更多详细信息,请参阅Dave Syer 的 Javaworld 文章