在Amazon SQS中从DLQ移除消息的最佳方式?

Mat*_*ell 55 amazon-sqs amazon-web-services

将消息从死信队列移回Amazon SQS中的原始队列的最佳做法是什么?

可不可能是

  1. 从DLQ获取消息
  2. 将消息写入队列
  3. 从DLQ删除邮件

或者有更简单的方法吗?

此外,AWS最终会在控制台中使用一个工具来移除DLQ上的消息吗?

小智 86

这是一个快速黑客.这绝对不是最佳或推荐的选择.

  1. 将主SQS队列设置为实际DLQ的DLQ,最大接收为1.
  2. 查看DLQ中的内容(这会将消息移动到主队列,因为这是实际DLQ的DLQ)
  3. 删除设置,使主队列不再是实际DLQ的DLQ

  • 但是,执行此操作时,接收计数不会重置为0.小心. (10认同)
  • 是的,这是一个非常黑客 - 但如果你知道自己在做什么并且没有时间以正确的方式解决这个问题,那么这是一个很好的选择. (7认同)
  • 你是个天才. (3认同)
  • 正确的方法是在 SQS 中配置最大接收计数的重新驱动策略,当消息超过设置的接收计数时,它会自动将消息移动到 DLQ,然后编写一个读取器线程从 DLQ 读取。 (2认同)
  • 几个月前,我为这个问题创建了一个 CLI 工具:https://github.com/renanvieira/phoenix-letter (2认同)

Fro*_*ion 60

2021 年 12 月 1 日, AWS发布了将消息从 DLQ 重新驱动回源队列(或自定义队列)的功能。

通过死信队列重新驱动到源队列,您可以简化和增强标准队列的错误处理工作流程。

DLQ 重新驱动

来源:

引入 Amazon Simple Queue Service 死信队列重新驱动到源队列

  • 注意:目前仅控制台中的标准队列支持 DLQ 重新驱动。 (9认同)

Ula*_*ach 28

有一些脚本可以为您执行此操作:

  • 基于 npm/nodejs:http ://github.com/garryyao/replay-aws-dlq
# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
Run Code Online (Sandbox Code Playgroud)
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 
Run Code Online (Sandbox Code Playgroud)

  • 这是最简单的方法,与公认的答案不同。只需从设置了 AWS env vars 属性的终端运行此命令:`npx replay-aws-dlq DL_URI MAIN_URI` (2认同)
  • 尝试了SQSMover,效果很好。 (2认同)

Ash*_*Ash 11

不需要移动消息,因为它会带来许多其他挑战,如重复消息,恢复方案,丢失消息,重复数据删除检查等.

以下是我们实施的解决方案 -

通常,我们将DLQ用于瞬态错误,而不是永久性错误.所以采取以下方法 -

  1. 像常规队列一样从DLQ读取消息

    优点
    • 避免重复的邮件处理
    • 更好地控制DLQ-就像我进行检查一样,只有在完全处理常规队列时才进行处理.
    • 根据DLQ上的消息扩展流程
  2. 然后按照常规队列所遵循的相同代码进行操作.

  3. 在中止作业或处理过程中终止进程时更可靠(例如实例终止或进程终止)

    优点
    • 代码可重用性
    • 错误处理
    • 恢复和消息重播
  4. 扩展消息可见性,以便其他线程不会处理它们.

    效益
    • 避免多线程处理相同的记录.
  5. 仅在存在永久性错误或成功时删除消息.

    效益
    • 继续处理,直到我们遇到瞬态错误.

  • 这确实是生产中的好方法。不过我认为这篇文章只是询问如何将消息从 DLQ 重新发布到普通队列。如果您知道自己在做什么,有时会很方便。 (2认同)

lin*_*hrr 8

我使用 boto3 lib 编写了一个小的 python 脚本来执行此操作:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break
Run Code Online (Sandbox Code Playgroud)

您可以在此链接中获取此脚本

这个脚本基本上可以在任意队列之间移动消息。它支持先进先出队列以及您可以提供该message_group_id字段。


Dav*_*ave 7

这看起来是你最好的选择.在第2步之后,您的进程可能会失败.在这种情况下,您最终会将邮件复制两次,但是应用程序应该处理重新传递邮件(或者不关心).


Bri*_*ley 6

这里:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()
Run Code Online (Sandbox Code Playgroud)