在JMS(ActiveMQ)中使用事务

pro*_*bra 5 activemq-classic middleware transactions jms

在我们的后端中,有几种服务可通过带有Apache ActiveMQ的JMS发送和接收消息。每个服务都有一个到ActiveMQ代理的会话。现在,我们要执行以下操作(伪代码):

服务s1:

Message m = createMessage("s2","Hello World")
sendMessage(m)
try {
   Message answer = commit()
   ...
} catch (TransactionFailedException e){
   ...
}
Run Code Online (Sandbox Code Playgroud)

服务s2:

onMessageReceive:
try {
   Message m = getReceivedMessage()
   Message answer = doSomeStuff()
   send(answer)
} (Exception e) {
   rollback()
}
Run Code Online (Sandbox Code Playgroud)

显然,提交必须阻塞直到答案到达或事务失败为止。服务s2也有可能创建新的嵌套事务,因为s2正在向另一服务发送消息。如何使用ActiveMQ的事务来实现此行为?有一些示例可用,但是在这些示例中,事务仅用作批处理机制来发送消息。

djn*_*jna 2

我将您的问题解释为您希望 s2 中的工作失败导致 s1 中正在进行的事务回滚。

所以你要

s1 do some work

s2 do some work

if ( s2 OK )
   perhaps do even more work in s1
   commit s1
else
   rollback s1
Run Code Online (Sandbox Code Playgroud)

JMS 的异步模型通常不是为此目的而设计的。根本原因是因为

  1. 在事务期间,资源(例如数据库记录)被锁定 - s1 已完成一些工作,直到事务解决为止,这些资源必须保持锁定。
  2. 异步处理旨在将 s2 工作与 s1 工作分离,s2 工作可能会在请求放入队列后几分钟甚至几天内发生。S2无法知道s1是否还在等他。JMS的整个设计点就是解耦s1和s2中的处理。

有两种方法可以实现 s1 和 s2 之间的协调:

一种是使用真正的分布式事务,使用 JMS 之外的协议,例如 EJB 可以将事务从一个进程传播到另一个进程,或者使用 WS-AtomicTransaction 和 Web 服务。这确实增加了操作复杂性 - 您必须管理事务日志和某个组件长期出现故障的场景。

另一种方法是设计两个协作系统来稳健地处理“补偿”工作。您接受这一点,例如 s2 可能会失败,并进行二次处理来处理重新发送请求、处理超时等。最终您可能会遇到一些需要人类参与的情况,但通过良好的设计,这些情况可以是最小。在大型系统中,通常没有其他选择,例如航空公司的预订系统和连锁酒店的预订系统很可能不是为分布式事务协调而设计的,因此除了进行一些仔细的处理来管理预订航班和房间之外,别无选择。