Lib*_*eph 7 message-queue azure azureservicebus
我在Azure中创建了一个服务总线队列,它运行良好.如果消息未在默认尝试(10次)内传递,则它正确地将消息移动到死信队列.
现在,我想将此消息从死信队列重新提交回其发起的队列,并查看它是否再次起作用.我尝试过使用服务总线资源管理器.但它会立即转移到死信队列.
有可能做同样的事情,如果是这样的话怎么样?
Sim*_*mon 15
我们有一批大约 60k 的消息,需要从死信队列中重新处理。通过 Service Bus Explorer 查看并发送回消息,每从我的计算机发送 1000 条消息大约需要 6 分钟。我通过为 DLQ 消息设置转发规则到另一个队列并从那里自动转发到原始队列解决了这个问题。对于所有 60k 消息,此解决方案大约需要 30 秒。
Fan*_*tic 12
我们经常需要重新提交消息。@Baglay-Vyacheslav 的回答很有帮助。我粘贴了一些更新的 C# 代码,可与最新的 Azure.Messaging.ServiceBus Nuget 包配合使用。
使得在队列/主题/订阅者上处理 DLQ 变得更快/更容易。
using Azure.Messaging.ServiceBus;
using System.Collections.Generic;
using System.Threading.Tasks;
using NLog;
namespace ServiceBus.Tools
{
class TransferDeadLetterMessages
{
// https://github.com/Azure/azure-sdk-for-net/blob/Azure.Messaging.ServiceBus_7.2.1/sdk/servicebus/Azure.Messaging.ServiceBus/README.md
private static Logger logger = LogManager.GetCurrentClassLogger();
private static ServiceBusClient client;
private static ServiceBusSender sender;
public static async Task ProcessTopicAsync(string connectionString, string topicName, string subscriberName, int fetchCount = 10)
{
try
{
client = new ServiceBusClient(connectionString);
sender = client.CreateSender(topicName);
ServiceBusReceiver dlqReceiver = client.CreateReceiver(topicName, subscriberName, new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter,
ReceiveMode = ServiceBusReceiveMode.PeekLock
});
await ProcessDeadLetterMessagesAsync($"topic: {topicName} -> subscriber: {subscriberName}", fetchCount, sender, dlqReceiver);
}
catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
{
if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
{
logger.Error(ex, $"Topic:Subscriber '{topicName}:{subscriberName}' not found. Check that the name provided is correct.");
}
else
{
throw;
}
}
finally
{
await sender.CloseAsync();
await client.DisposeAsync();
}
}
public static async Task ProcessQueueAsync(string connectionString, string queueName, int fetchCount = 10)
{
try
{
client = new ServiceBusClient(connectionString);
sender = client.CreateSender(queueName);
ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter,
ReceiveMode = ServiceBusReceiveMode.PeekLock
});
await ProcessDeadLetterMessagesAsync($"queue: {queueName}", fetchCount, sender, dlqReceiver);
}
catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
{
if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
{
logger.Error(ex, $"Queue '{queueName}' not found. Check that the name provided is correct.");
}
else
{
throw;
}
}
finally
{
await sender.CloseAsync();
await client.DisposeAsync();
}
}
private static async Task ProcessDeadLetterMessagesAsync(string source, int fetchCount, ServiceBusSender sender, ServiceBusReceiver dlqReceiver)
{
var wait = new System.TimeSpan(0, 0, 10);
logger.Info($"fetching messages ({wait.TotalSeconds} seconds retrieval timeout)");
logger.Info(source);
IReadOnlyList<ServiceBusReceivedMessage> dlqMessages = await dlqReceiver.ReceiveMessagesAsync(fetchCount, wait);
logger.Info($"dl-count: {dlqMessages.Count}");
int i = 1;
foreach (var dlqMessage in dlqMessages)
{
logger.Info($"start processing message {i}");
logger.Info($"dl-message-dead-letter-message-id: {dlqMessage.MessageId}");
logger.Info($"dl-message-dead-letter-reason: {dlqMessage.DeadLetterReason}");
logger.Info($"dl-message-dead-letter-error-description: {dlqMessage.DeadLetterErrorDescription}");
ServiceBusMessage resubmittableMessage = new ServiceBusMessage(dlqMessage);
await sender.SendMessageAsync(resubmittableMessage);
await dlqReceiver.CompleteMessageAsync(dlqMessage);
logger.Info($"finished processing message {i}");
logger.Info("--------------------------------------------------------------------------------------");
i++;
}
await dlqReceiver.CloseAsync();
logger.Info($"finished");
}
}
}
Run Code Online (Sandbox Code Playgroud)
小智 7
尝试消除死信原因
resubmittableMessage.Properties.Remove("DeadLetterReason");
resubmittableMessage.Properties.Remove("DeadLetterErrorDescription");
Run Code Online (Sandbox Code Playgroud)
完整代码
using Microsoft.ServiceBus.Messaging;
using System.Transactions;
namespace ResubmitDeadQueue
{
class Program
{
static void Main(string[] args)
{
var connectionString = "";
var queueName = "";
var queue = QueueClient.CreateFromConnectionString(connectionString, QueueClient.FormatDeadLetterPath(queueName), ReceiveMode.PeekLock);
BrokeredMessage originalMessage
;
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
do
{
originalMessage = queue.Receive();
if (originalMessage != null)
{
using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
// Create new message
var resubmittableMessage = originalMessage.Clone();
// Remove dead letter reason and description
resubmittableMessage.Properties.Remove("DeadLetterReason");
resubmittableMessage.Properties.Remove("DeadLetterErrorDescription");
// Resend cloned DLQ message and complete original DLQ message
client.Send(resubmittableMessage);
originalMessage.Complete();
// Complete transaction
scope.Complete();
}
}
} while (originalMessage != null);
}
}
}
Run Code Online (Sandbox Code Playgroud)
感谢这里的其他一些回复!
小智 5
当您修复并重新提交死信队列中的消息时,Service Bus Explorer 工具始终会创建原始消息的克隆。它不会有任何不同,因为默认情况下服务总线消息传递不提供任何消息修复和重新提交机制。我建议您调查一下为什么您的消息在重新提交时会出现在死信队列及其克隆中。希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
5281 次 |
| 最近记录: |