在SQS队列中使用许多使用者

p.m*_*aes 8 queue message-queue amazon-sqs

我知道可以使用多个线程来使用SQS队列.我想保证每条消息都会消耗一次.我知道可以更改消息的可见性超时,例如,等于我的处理时间.如果我的进程花费的时间超过可见性超时(例如,连接速度慢),则其他线程可以使用相同的消息.

保证邮件处理一次的最佳方法是什么?

Kre*_*ase 26

保证邮件处理一次的最佳方法是什么?

你要求保证 - 你不会得到保证.您可以减少被处理的消息的概率比一次更极少量的,但你不会得到保证.

我将解释为什么,以及减少重复的策略.

重复来自何处

  1. 当您在SQS中放入消息时,SQS可能实际上不止一次收到该消息
    • 例如:发送消息时发生轻微的网络打嗝导致自动重试的瞬态错误 - 从消息发送者的角度来看,它失败一次,并成功发送一次,但SQS收到了这两个消息.
  2. SQS可以在内部生成重复项
    • 模拟第一个例子 - 有很多计算机处理消息,SQS需要确保没有丢失 - 消息存储在多个服务器上,这可能导致重复.

在大多数情况下,通过利用SQS消息可见性超时,从这些来源重复的可能性已经相当小 - 比如小百分之几.

如果处理副本确实不是那么糟糕(努力使你的消息消费是幂等的!),我会认为这足够好 - 减少重复的机会进一步复杂并且可能很昂贵......


您的应用程序可以做些什么来进一步减少重复?

好的,这里我们沿着兔子洞走下去...在高层,您将需要为您的消息分配唯一的ID,并在开始处理之前检查正在进行或已完成的ID的原子缓存:

  1. 确保您的邮件在插入时提供了唯一标识符
    • 没有这个,你将无法分辨重复.
  2. 处理邮件"行尾"的重复.
    • 如果您的消息接收者需要发送消息以进行进一步处理,那么它可能是另一个重复源(出于上述类似的原因)
  3. 你需要某个地方以原子方式存储和检查这些唯一ID(并在超时后刷新它们).有两个重要的状态:"InProgress"和"Completed"
    • InProgress条目应根据处理失败时需要恢复的速度超时.
    • 已完成的条目应根据您希望重复数据删除窗口的时长而超时
    • 最简单的可能是Guava缓存,但只适用于单个处理应用程序.如果您有大量消息或分布式消耗,请考虑此作业的数据库(使用后台进程扫描过期条目)
  4. 在处理消息之前,尝试将messageId存储在"InProgress"中.如果已经存在,请停止 - 您只需处理一份副本.
  5. 检查消息是否"已完成"(如果有,则停止)
  6. 您的线程现在对该messageId具有独占锁定 - 处理您的消息
  7. 将messageId标记为"已完成" - 只要此messageId保留在此处,您就不会处理该messageId的任何重复项.
    • 你可能无法承受无限的存储空间.
  8. 从"InProgress"中删除messageId(或者让它从这里过期)

一些笔记

  • 请记住,没有所有这些重复的可能性已经非常低.根据消息的重复数据删除时间和金钱的价值,您可以随意跳过或修改任何步骤
    • 例如,您可以省略"InProgress",但这会使两个线程同时处理重复消息的可能性很小(第二个线程在第一个消息"完成"之前开始)
  • 您的重复数据删除窗口只要您将messageIds保持在"已完成"即可.由于您可能无法承受无限存储空间,因此最后至少要长达2倍的SQS消息可见性超时; 之后重复的可能性降低(除了已经非常低的机会,但仍然无法保证).
  • 即便如此,仍然存在重复的可能性 - 所有预防措施和SQS消息可见性超时有助于将此机会减少到非常小,但机会仍然存在:
    • 您的应用程序可以在处理消息后立即崩溃/挂起/执行一个非常长的GC,但在messageId"已完成"之前(可能您正在为此存储使用数据库并且与该存储的连接已关闭)
    • 在这种情况下,"处理"最终将到期,另一个线程可以处理此消息(在SQS可见性超时也到期之后或因为SQS中有重复).