cod*_*ent 5 apache-kafka spring-cloud spring-cloud-stream
实现发件箱模式的通常方法是将消息有效负载存储在发件箱表中,并有一个单独的进程(消息中继)查询待处理的消息,并将它们发布到消息代理中,在我的例子中是 Kafka。
发件箱表的状态可能如下所示。
OUTBOX TABLE
---------------------------------
|ID | STATE | TOPIC | PAYLOAD |
---------------------------------
| 1 | PROCESSED | user |
| 2 | PENDING | user |
| 3 | PENDING | billing |
----------------------------------
Run Code Online (Sandbox Code Playgroud)
My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它定期 ( @Scheduled) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。
第一个问题是:如果我启动 Message Relay 的多个实例,所有实例都会查询 Outbox 表,并且可能在某些时候不同的实例将获得相同的 PENDING 注册表以发布到 Kafka,从而生成重复的消息。我怎样才能防止这种情况?
另一种情况:假设只有一个消息中继。它获取一个 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布它。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。
为了防止第一个问题,您必须使用数据库锁定。
SELECT * FROM outbox WHERE id = 1 FOR UPDATE
Run Code Online (Sandbox Code Playgroud)
这将防止其他进程访问同一行。
第二个问题你无法解决,因为你没有使用 Kafka 进行分布式事务。
因此,一种方法可能是在将记录发送到 Kafka 之前将其设置为“PROCESSING”状态,如果应用程序崩溃,您应该检查是否有处于“PROCESSING”状态的记录,并执行一些清理任务以查明它们是否已发送到 Kafka 。
但最好的解决方案是拥有一个可以处理重复项的幂等系统。
| 归档时间: |
|
| 查看次数: |
1340 次 |
| 最近记录: |