发件箱模式 - 我们如何防止消息中继过程生成重复的消息?

cod*_*ent 5 apache-kafka spring-cloud spring-cloud-stream

实现发件箱模式的通常方法是将消息有效负载存储在发件箱表中,并有一个单独的进程(消息中继)查询待处理的消息,并将它们发布到消息代理中,在我的例子中是 Kafka。

发件箱表的状态可能如下所示。

 OUTBOX TABLE
 ---------------------------------
|ID | STATE     | TOPIC | PAYLOAD |
 ---------------------------------
| 1 | PROCESSED | user            |
| 2 | PENDING   | user            |
| 3 | PENDING   | billing         |
----------------------------------
Run Code Online (Sandbox Code Playgroud)

My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它定期 ( @Scheduled) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。

第一个问题是:如果我启动 Message Relay 的多个实例,所有实例都会查询 Outbox 表,并且可能在某些时候不同的实例将获得相同的 PENDING 注册表以发布到 Kafka,从而生成重复的消息。我怎样才能防止这种情况?

另一种情况:假设只有一个消息中继。它获取一个 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布它。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。

Sim*_*lli 5

为了防止第一个问题,您必须使用数据库锁定。

SELECT * FROM outbox WHERE id = 1 FOR UPDATE
Run Code Online (Sandbox Code Playgroud)

这将防止其他进程访问同一行。

第二个问题你无法解决,因为你没有使用 Kafka 进行分布式事务。

因此,一种方法可能是在将记录发送到 Kafka 之前将其设置为“PROCESSING”状态,如果应用程序崩溃,您应该检查是否有处于“PROCESSING”状态的记录,并执行一些清理任务以查明它们是否已发送到 Kafka 。

但最好的解决方案是拥有一个可以处理重复项的幂等系统。