Mongo Change Streams 运行多次(某种程度):运行多个实例的节点应用程序

Pau*_*oud 9 mongodb node.js changestream

我的 Node 应用程序使用 Mongo 更改流,并且该应用程序在生产中运行 3 个以上实例(最终会更多,因此随着它的增长,这将成为一个更大的问题)。因此,当发生变更时,变更流功能运行的次数与进程的数量一样多。

如何进行设置以使更改流仅运行一次?

这是我所得到的:

const options = { fullDocument: "updateLookup" };

const filter = [
  {
    $match: {
      $and: [
        { "updateDescription.updatedFields.sites": { $exists: true } },
        { operationType: "update" }
      ]
    }
  }
];

const sitesStream = Client.watch(sitesFilter, options);

// Start listening to site stream
sitesStream.on("change", async change => {
  console.log("in site change stream", change);
  console.log(
    "in site change stream, update desc",
    change.updateDescription
  );

  // Do work...
  console.log("site change stream done.");
  return;
});
Run Code Online (Sandbox Code Playgroud)

小智 5

仅使用 Mongodb 查询运算符即可轻松完成。您可以在 ID 字段上添加模查询,其中除数是应用程序实例的数量 (N)。余数就是 {0, 1, 2, ..., N-1} 的元素。如果您的应用程序实例按从 0 到 N-1 的升序编号,您可以像这样编写过滤器:

const filter = [
  {
    "$match": {
      "$and": [
        // Other filters
        { "_id": { "$mod": [<number of instances>, <this instance's id>]}}
      ]
    }
  }
];
Run Code Online (Sandbox Code Playgroud)

  • 如果实例自动扩展以满足需求,这将不起作用。 (5认同)
  • 如果实例 N-1 也失败,这将不起作用。 (2认同)

小智 5

在强有力的保证下做到这一点很困难,但并非不可能。我在这里写了一个解决方案的详细信息: https: //www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html

这些例子是用Java编写的,但重要的部分是算法。

这归结为一些技巧:

  • 每个进程都尝试获取锁
  • 每个锁(或每个更改)都有一个关联的隔离令牌
  • 处理每个更改必须是幂等的
  • 在处理更改时,令牌用于确保有序、一次性有效的更新。

博客文章中有更多详细信息。


Pau*_*oud 1

虽然 Kafka 选项听起来很有趣,但在我不熟悉的平台上需要进行大量基础设施工作,因此我决定选择离家更近的东西,将 MQTT 消息发送到一个小型独立应用程序,并让 MQTT 服务器监视消息的唯一性。

siteStream.on("change", async change => {
  console.log("in site change stream);
  const mqttClient = mqtt.connect("mqtt://localhost:1883");
  const id = JSON.stringify(change._id._data);
  // You'll want to push more than just the change stream id obviously...
  mqttClient.on("connect", function() {
    mqttClient.publish("myTopic", id);
    mqttClient.end();
  });
});
Run Code Online (Sandbox Code Playgroud)

我仍在制定 MQTT 服务器的最终版本,但是评估消息唯一性的方法可能会在应用程序内存中存储一​​组更改流 ID,因为不需要保留它们,也不需要评估是否继续进行任何操作进一步基于之前是否见过该更改流 ID。

var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
  client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
  context = message.toString().replace(/"/g, "");
  if (seen.indexOf(context) < 0) {
    seen.push(context);
    // Do stuff
  }
});
Run Code Online (Sandbox Code Playgroud)

这不包括安全性等,但你明白了。