如何防止重复的SQS消息?

Mar*_*ind 37 amazon-sqs amazon-web-services

防止Amazon SQS中重复邮件的最佳方法是什么?我有一个等待被抓取的域的SQS.在我向SQS添加新域之前,我可以检查保存的数据,看看它是否最近被抓取,以防止重复.

问题在于尚未抓取的域.例如,如果队列中有1000个域尚未被爬网.任何这些链接都可以再次添加,一次又一次.这使我的SQS膨胀成数十万条主要是重复的消息.

我该如何防止这种情况?有没有办法从队列中删除所有重复项?或者有没有办法在添加消息之前搜索队列中的消息?我觉得任何有SQS的人都必须经历过这个问题.

我可以看到的一个选项是,在将域添加到SQS之前,我是否存储了一些数据.但是,如果我必须将数据存储两次,那么首先会破坏使用SQS的重点.

hen*_*wan 45

正如提到的其他答案,您无法阻止来自SQS的重复消息.

大多数情况下,您的消息将被传递给您的一个消费者一次,但您在某个阶段遇到重复消息.

我不认为这个问题有一个简单的答案,因为它需要提出一个可以应对重复的适当架构,这意味着它本质上是幂等的.

如果分布式体系结构中的所有工作者都是幂等的,那么这很容易,因为您不必担心重复.但实际上,这种环境并不存在,某种程度上某些东西无法处理它.

我目前正在开展一个项目,要求我解决这个问题,并提出一个处理它的方法.我认为在这里分享我的想法可能会让其他人受益.它可能是一个获得我的思考反馈的好地方.

事实商店

开发服务是一个非常好的主意,这样他们就可以收集理论上可以重放的事实,以便在所有受影响的下游系统中重现相同的状态.

例如,假设您正在为股票交易平台构建消息代理.(我之前曾经做过这样的项目,这很糟糕,但也是一次很好的学习经历.)

现在让我们说这些交易进来了,有3个系统感兴趣:

  1. 需要保持更新的旧式大型机
  2. 一种系统,用于整理所有交易并与FTP服务器上的合作伙伴共享
  3. 记录交易的服务,并将股份重新分配给新所有者

我知道,这有点令人费解,但想法是有一条消息(事实)进入,有各种分布式下游效应.

现在让我们假设我们维护一个事实商店,记录进入我们经纪商的所有交易.并且所有3个下游服务所有者都打电话给我们告诉我们他们已经丢失了过去3天的所有数据.FTP下载落后3天,大型机落后3天,所有交易都落后3天.

因为我们有事实存储,所以我们理论上可以将所有这些消息从特定时间重放到特定时间.在我们的例子中,从3天前到现在.下游服务可能会被赶上.

这个例子可能看起来有点过分,但我试图传达一些非常特别的东西:事实是要记录的重要事项,因为我们将在我们的架构中使用它来对抗重复.

Fact store如何帮助我们处理重复的消息

如果您在持久层上实现事实存储,该持久层为您提供CAP定理,一致性和可用性的CA部分,您可以执行以下操作:

一旦从队列收到消息,您就可以在事实存储区中检查您之前是否已经看过此消息,如果有,则此时是否已锁定,以及是否处于暂挂状态.在我的情况下,我将使用MongoDB来实现我的事实存储,因为我对它非常熟悉,但是其他各种数据库技术应该能够处理这个问题.

如果该事实尚不存在,则会将其插入到事实存储中,具有挂起状态和锁定到期时间.这应该使用原子操作来完成,因为你不希望这发生两次!这是您确保服务的幂等性的地方.

快乐的情况 - 大部分时间都会发生

当Fact存储回到您的服务,告诉它事实不存在,并且创建了一个锁时,该服务会尝试执行此操作.完成后,它会删除SQS消息,并将事实标记为已完成.

重复的消息

这就是当消息通过时发生的事情并且它不是重复的.但是让我们看一下重复的消息何时进入.服务选择它,并要求事实存储用锁来记录它.事实商店告诉它它已经存在,并且已被锁定.该服务忽略该消息并跳过它!一旦消息处理完成,由另一个工作人员完成,它将从队列中删除该消息,我们将不再看到它.

灾难案例 - 很少发生

那么当一个服务第一次在商店中记录这个事实,然后在一段时间内获得一个锁定但是会倒下时会发生什么?SQS会再次向您发送一条消息,如果它被拾取,但在从队列中提供后的一段时间内没有被删除.这就是为什么我们编码我们的事实存储,使服务在有限的时间内保持锁定.因为如果它崩溃了,我们希望SQS在稍后的时间向服务或其他实例呈现消息,允许该服务假设该事实应该再次并入状态(已执行).

  • 这不再有效。FifoQueue 和 DeduplicationId 现在可以以 5 分钟的间隔折叠重复消息。请参阅下面的@ayman eltabakh。 (3认同)
  • 真棒的答案!轻微的挑剔:我会说在**Happy Case**中你应该将事实标记为已完成,_然后_删除SQS消息.然后我还建议更新**Duplicate message**case以删除消息,如果事实已被标记为已完成(不等待原始处理程序执行此操作). (2认同)

aym*_*akh 14

Amazon SQS 推出具有 Exactly-Once 处理和更低价格的标准队列的 FIFO 队列

使用 Amazon SQS 消息重复数据删除 ID 消息重复数据删除 ID 是用于对已发送消息进行重复数据删除的令牌。如果成功发送具有特定消息重复数据删除 ID 的消息,则成功接受使用相同消息重复数据删除 ID 发送的任何消息,但不会在 5 分钟重复数据删除间隔内传送。

Amazon SQS 推出 FIFO 队列

使用 Amazon SQS 消息重复数据删除 ID

  • 重复数据删除仅在 5 分钟间隔内起作用。 (4认同)
  • @MaxSchindler 你好,Max,FIFO 队列提供了额外的功能,有助于防止消息生产者发送无意的重复项或消息消费者接收无意的重复项。在 markus-lind 提出问题两年后,我们引入了 FIFO 队列。 (2认同)

Kee*_*asa 7

根据 AWS Docs,Exactly-Once Processing提供了一种避免重复消息的方法。

与标准队列不同,FIFO 队列不会引入重复消息。FIFO 队列可帮助您避免向队列发送重复项。如果您在 5 分钟重复数据删除间隔内重试 SendMessage 操作,Amazon SQS 不会将任何重复项引入队列。

如果您的队列是 FIFO 队列并启用了基于内容的复制,则可以利用此功能避免在重复数据删除间隔期间出现重复消息。有关更多信息,请阅读本和以下链接。

https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html#cfn-sqs-queue-contentbaseddeduplication

  • FIFO 不支持主题到队列机制! (2认同)

Séb*_*acq 6

没有 API 级别的方法可以防止将重复消息发布到 SQS 队列。恐怕您需要在应用程序级别处理此问题。

例如,您可以使用 DynamoDB 表来存储等待被抓取的域名,并且仅当它们不在 DynamoDB 中时才将它们添加到队列中。

  • 但如果我这样做,为什么还要使用 SQS?为什么不让应用程序直接从 DynamoDB 读取?也许我误解了 SQS 的使用,但是如果我仍然需要将所有数据存储在数据库中,我觉得 SQS 失去了它的价值和意义。对我来说,我想使用 SQS 的原因是不需要将数据写入数据库。 (8认同)
  • 这是一个架构决定。SQS(或任何排队系统)在允许应用程序之间进行异步通信以及让多个消息消费者消费来自多个生产者的消息方面非常出色。示例将在 Web 层和一批批处理人员之间。数据库不是为这些类型的通信设计的,需要额外的工作。但 DB 擅长在独立工作人员或应用程序之间共享状态。在您的用例中,也许一个数据库就足够了。 (4认同)