乱序处理后的“重新排序”消息

L.T*_*.T. 6 distributed message-queue redis node.js kue

我正在研究什么基本上是一个高度可用的分布式消息传递系统。系统通过 HTTP 或 TCP 从某处接收消息,对其执行各种转换,然后将其发送到一个或多个目的地(也使用 TCP/HTTP)。

系统要求发送到给定目的地的所有消息都是有序的,因为有些消息建立在先前消息的内容之上。这限制了我们按顺序处理消息,每条消息大约需要 750 毫秒。因此,例如,如果有人每 250 毫秒向我们发送一条消息,我们将被迫将消息排在彼此后面。这最终会在高负载下在消息处理中引入无法容忍的延迟,因为每条消息在轮到它之前可能必须等待数百条其他消息被处理。

为了解决这个问题,我希望能够在不违反我们按顺序发送它们的要求的情况下并行化我们的消息处理。

我们可以轻松地横向扩展我们的处理。缺失的部分是一种确保即使消息被乱序处理的方法,它们也会被“重新排序”并按照接收顺序发送到目的地。我正在努力寻找实现这一目标的最佳方式。

Apache Camel 有一个叫做 Resequencer 的东西可以做到这一点,它包括一个很好的图表(我没有足够的代表直接嵌入)。这正是我想要的:接收无序消息并将它们按顺序排列的东西。

但是,我不希望它是用 Java 编写的,我需要该解决方案具有高可用性(即能够抵抗典型的系统故障,如崩溃或系统重新启动),我认为 Apache Camel 无法提供。

我们的应用程序是用 Node.js 编写的,使用 Redis 和 Postgresql 进行数据持久化。我们将Kue库用于我们的消息队列。尽管 Kue 提供优先排队,但功能集对于上述用例来说太有限了,所以我认为我们需要一种替代技术来与 Kue 协同工作来重新排序我们的消息。

我试图在网上研究这个话题,但我找不到预期的那么多信息。看起来像分布式架构模式的类型,会有大量的文章和实现,但我没有看到那么多。搜索诸如“消息重新排序”、“乱序处理”、“并行消息处理”等内容,可以找到大多数只是放宽基于分区或主题或诸如此类的“有序”要求的解决方案。或者,他们谈论在单台机器上的并行化。我需要一个解决方案:

  • 可以以任何顺序同时处理多条消息。
  • 将始终按照消息到达系统的顺序发送消息,无论它们以何种顺序进行处理。
  • 可从 Node.js 使用
  • 可以在 HA 环境中运行(即它的多个实例同时运行在同一个消息队列上,没有不一致的情况。)

我们目前的计划,对我来说很有意义,但我在网上找不到任何描述,是使用 Redis 来维护一组进行中和准备发送的消息,按到达时间排序。粗略地说,它是这样工作的:

  1. 当收到一条消息时,该消息被放入进行中的集合中。
  2. 当消息处理完成时,该消息被放入准备发送集。
  3. 只要在进行中和准备发送集的前面都有相同的消息,就可以发送该消息并且它会按顺序进行。

我会编写一个小型 Node 库,它使用原子 Redis 事务通过优先级队列式 API 实现此行为。但这只是我自己想出来的,所以我想知道:是否有其他技术(理想情况下使用我们已经在使用的 Node/Redis 堆栈)可以解决重新排序乱序消息的问题? 或者是否有其他术语可以用作我可以用作研究关键字的此问题?谢谢你的帮助!

Fil*_*und 3

这是一个常见问题,因此肯定有很多解决方案可用。这也是一个相当简单的问题,也是分布式系统领域很好的学习机会。我建议你自己写。

构建这个时你会遇到一些问题,即

2:仅一次传送
1:保证消息顺序
2:仅一次传送

您已经找到了第 1 个,并且通过在 redis 中对它们重新排序来解决这个问题,这是一个不错的解决方案。然而,另一个问题尚未解决。

看起来您的架构并不适合容错,因此目前,如果服务器崩溃,您可以重新启动它并继续您的生活。当按顺序处理所有请求时,这种方法效果很好,因为这样您就可以根据最后一个成功完成的请求确切地知道崩溃的时间。

您需要的是找出您实际完成了哪些请求以及哪些请求失败的策略,或者是在出现故障时向客户发送的一封写得很好的道歉信。

如果Redis不分片,那么它是强一致的。如果单个节点崩溃,它将失败并可能丢失所有数据,但您不会遇到任何数据无序或数据突然出现和消失的问题。因此,单个 Redis 节点可以保证,如果将消息插入到处理集中,然后插入到完成集中,则任何节点都不会在完成集中看到该消息,除非该消息也在目标处理集中。流程集。

我会怎么做

使用 redis 似乎太模糊了,假设消息不是很大,如果进程崩溃,丢失它们是可以的,并且多次运行它们,甚至同时运行单个请求的多个副本也不是问题。问题。

我建议设置一个主管服务器,它接受传入的请求,将每个请求分派到随机选择的从站,存储响应并在发送它们之前再次将它们放回顺序。您说您预计处理需要 750 毫秒。如果从设备在 2 秒内没有响应,则在 0-1 秒内随机将其再次调度到另一个节点。第一个响应的就是我们要使用的。谨防重复的回复。

如果重试请求也失败,则将最大等待时间加倍。在大约 5 次失败之后,每次等待的时间最多为前一次的两倍(或任何大于 1 的倍数),我们可能会遇到永久性错误,因此我们可能应该请求人工干预。该算法称为指数退避,可防止请求突然激增而导致整个集群崩溃。不使用随机间隔,并且在 n 秒后重试可能会导致每 n 秒发生一次 DOS 攻击,直到集群死亡(如果它获得足够大的负载峰值)。

有很多方法可能会失败,因此请确保该系统不是存储数据的唯一位置。然而,这可能在 99% 以上的时间里都能工作,它可能至少和你当前的系统一样好,而且你可以用几百行代码来实现它。只需确保您的主管使用异步请求,以便您可以处理重试和超时。JavaScript 本质上是单线程的,所以这比正常情况稍微棘手一些,但我相信你可以做到。