Mat*_*ell 55 amazon-sqs amazon-web-services
将消息从死信队列移回Amazon SQS中的原始队列的最佳做法是什么?
可不可能是
或者有更简单的方法吗?
此外,AWS最终会在控制台中使用一个工具来移除DLQ上的消息吗?
小智 86
这是一个快速黑客.这绝对不是最佳或推荐的选择.
Fro*_*ion 60
2021 年 12 月 1 日, AWS发布了将消息从 DLQ 重新驱动回源队列(或自定义队列)的功能。
通过死信队列重新驱动到源队列,您可以简化和增强标准队列的错误处理工作流程。
来源:
引入 Amazon Simple Queue Service 死信队列重新驱动到源队列
Ula*_*ach 28
有一些脚本可以为您执行此操作:
# install
npm install replay-aws-dlq;
# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
Run Code Online (Sandbox Code Playgroud)
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source
# use
sqsmover -s [source_queue_url] -d [dest_queue_url]
Run Code Online (Sandbox Code Playgroud)
Ash*_*Ash 11
不需要移动消息,因为它会带来许多其他挑战,如重复消息,恢复方案,丢失消息,重复数据删除检查等.
以下是我们实施的解决方案 -
通常,我们将DLQ用于瞬态错误,而不是永久性错误.所以采取以下方法 -
像常规队列一样从DLQ读取消息
优点然后按照常规队列所遵循的相同代码进行操作.
在中止作业或处理过程中终止进程时更可靠(例如实例终止或进程终止)
优点扩展消息可见性,以便其他线程不会处理它们.
效益仅在存在永久性错误或成功时删除消息.
效益我使用 boto3 lib 编写了一个小的 python 脚本来执行此操作:
conf = {
"sqs-access-key": "",
"sqs-secret-key": "",
"reader-sqs-queue": "",
"writer-sqs-queue": "",
"message-group-id": ""
}
import boto3
client = boto3.client(
'sqs',
aws_access_key_id = conf.get('sqs-access-key'),
aws_secret_access_key = conf.get('sqs-secret-key')
)
while True:
messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)
if 'Messages' in messages:
for m in messages['Messages']:
print(m['Body'])
ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
print(ret)
client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
else:
print('Queue is currently empty or messages are invisible')
break
Run Code Online (Sandbox Code Playgroud)
您可以在此链接中获取此脚本
这个脚本基本上可以在任意队列之间移动消息。它支持先进先出队列以及您可以提供该message_group_id
字段。
这里:
import boto3
import sys
import Queue
import threading
work_queue = Queue.Queue()
sqs = boto3.resource('sqs')
from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)
from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)
def process_queue():
while True:
messages = work_queue.get()
bodies = list()
for i in range(0, len(messages)):
bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})
to_q.send_messages(Entries=bodies)
for message in messages:
print("Coppied " + str(message.body))
message.delete()
for i in range(10):
t = threading.Thread(target=process_queue)
t.daemon = True
t.start()
while True:
messages = list()
for message in from_q.receive_messages(
MaxNumberOfMessages=10,
VisibilityTimeout=123,
WaitTimeSeconds=20):
messages.append(message)
work_queue.put(messages)
work_queue.join()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
28194 次 |
最近记录: |