如何将消息保存到数据库中并最终将响应发送到主题中?

gst*_*low 5 java transactions eventual-consistency rabbitmq messagebroker

我有以下RabbitMq消费者:

Consumer consumer = new DefaultConsumer(channel) {
    @Override
     public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            sendNotificationIntoTopic(message);
            saveIntoDatabase(message);
     }
};
Run Code Online (Sandbox Code Playgroud)

可能会发生以下情况:

  1. 消息已成功发送到主题
  2. 与数据库的连接丢失,因此数据库插入失败。

结果,我们有数据不一致。

预期结果要么两个操作都成功执行,要么都没有执行。

有什么解决方案可以实现吗?

聚苯乙烯

目前我有以下想法(请评论)

我们可以假设经纪人不会丢失任何消息。

我们必须订阅要发送的主题。

  1. 将条目保存到数据库中并设置status值为“ pending”的 字段
  2. 尝试将数据发送到主题。如果发送成功-更新status值为'success'的字段
  3. 我们必须有一份工作,必须检查具有待处理状态的行。目前有两种情况:
    3.1根本没有发送
    通知3.2通知已经发送但是保存到数据库失败(概率很低,但是有可能)

    因此,我们必须以某种方式区分这两种情况:我们可以将主题中的消息存储在集合中,作业可以检查消息是否被接受。因此,如果作业找到一条与数据库行相对应的消息,我们必须将状态更新为“成功”。否则,我们必须从数据库中删除条目。

我认为我的想法有一些弱点(例如,如果我们有多节点应用程序,则必须将消息存储在hazelcast(或类似物)中,但这是假设失败的另一点)

Ale*_*rov 1

这是尝试取消确认模式 https://servicecomb.apache.org/docs/distributed_saga_3/的示例 ,它应该能够处理您的问题。您应该容忍通过队列重复提交数据的可能性。这是一个例子:

  1. 定义抽象操作并为操作分配 ID 和时间戳。
  2. 将状态 Pending 写入数据库(您可以在与 1 相同的步骤中执行此操作)
  3. 编写一个侦听器,轮询数据库以查找状态为挂起且早于“超时”的所有操作
  4. 对于每个挂起的操作,通过具有指定 ID 的队列发送数据。
  5. 接收方应该知道该 ID,如果该 ID 已被处理,则不会发生任何事情。

6A . 如果您需要 100% 完成操作,则需要第二个队列,接收方将在其中发布消息 ID - 完成。如果不需要这种一致性,请跳过此步骤。或者,它可以发布 ID -Failed 失败原因。

6B . 提交方要么等待来自6A的消息,要么通过将状态DONE写入数据库来完成操作。

  • 一旦超过了 sertine 超时或超过了特定的重试限制。您将状态写入操作失败。
  • 您可以通过 ID 回滚向接收方操作发送消息。

请注意,所有这些步骤都不涉及技术事务。您可以使用非事务性数据库来执行此操作。

我所写的是“尝试取消确认模式”的变体,其中每个消息接收者都应该知道如何管理自己的数据。