gst*_*low 5 java transactions eventual-consistency rabbitmq messagebroker
我有以下RabbitMq消费者:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
sendNotificationIntoTopic(message);
saveIntoDatabase(message);
}
};
Run Code Online (Sandbox Code Playgroud)
可能会发生以下情况:
结果,我们有数据不一致。
预期结果要么两个操作都成功执行,要么都没有执行。
有什么解决方案可以实现吗?
目前我有以下想法(请评论)
我们可以假设经纪人不会丢失任何消息。
我们必须订阅要发送的主题。
status值为“ pending”的 字段status值为'success'的字段我们必须有一份工作,必须检查具有待处理状态的行。目前有两种情况:
3.1根本没有发送
通知3.2通知已经发送但是保存到数据库失败(概率很低,但是有可能)
因此,我们必须以某种方式区分这两种情况:我们可以将主题中的消息存储在集合中,作业可以检查消息是否被接受。因此,如果作业找到一条与数据库行相对应的消息,我们必须将状态更新为“成功”。否则,我们必须从数据库中删除条目。
我认为我的想法有一些弱点(例如,如果我们有多节点应用程序,则必须将消息存储在hazelcast(或类似物)中,但这是假设失败的另一点)
这是尝试取消确认模式 https://servicecomb.apache.org/docs/distributed_saga_3/的示例 ,它应该能够处理您的问题。您应该容忍通过队列重复提交数据的可能性。这是一个例子:
6A . 如果您需要 100% 完成操作,则需要第二个队列,接收方将在其中发布消息 ID - 完成。如果不需要这种一致性,请跳过此步骤。或者,它可以发布 ID -Failed 失败原因。
6B . 提交方要么等待来自6A的消息,要么通过将状态DONE写入数据库来完成操作。
请注意,所有这些步骤都不涉及技术事务。您可以使用非事务性数据库来执行此操作。
我所写的是“尝试取消确认模式”的变体,其中每个消息接收者都应该知道如何管理自己的数据。
| 归档时间: |
|
| 查看次数: |
289 次 |
| 最近记录: |