我有非常基本的架构:
Web API 队列中的消息在 Queue 和 Worker(后台处理服务)中出队并处理消息。
问题是 Web API 不知道 Worker 何时处理了消息。
Worker 是否可以提醒队列消息已成功处理,并且队列将“处理完成消息”事件发送回 Web API?
我正在考虑的一种解决方案:
Web API 对消息进行排队后,它每隔几秒检查一次消息状态:
如果消息状态为“Peck-Lock” -消息仍在处理中。
如果在队列中找不到消息- 消息已被处理(成功或不成功并不重要)。
但是微软没有现成的解决方案吗?
我使用 MassTransit 和 Azure 服务总线作为传输。我创建了 IAlert 和 IAlertExt 来实现 IAlert。我正在尝试发布两条消息:
busControl.Publish<IAlert>(new Alert(customerId));
busControl.Publish<IAlertExt>(new AlertExt(customerId));
Run Code Online (Sandbox Code Playgroud)
并编写消费者来处理消息:
public class AlertConsumer : IConsumer<IAlert>
{
public async Task Consume(ConsumeContext<IAlert> context){
...
}
}
public class AlertExtConsumer : IConsumer<IAlertExt>
{
public async Task Consume(ConsumeContext<IAlertExt> context){
...
}
}
Run Code Online (Sandbox Code Playgroud)
现在,发布后,两个消费者都会检索一条使用通用接口发布的消息。
如何强制 AlertExtConsumer 不仅检索使用 IAlertExt 发布的消息,还检索使用 IAlert 发布的消息?
我已经使用许多应用程序订阅的主题将 azure 服务总线的实现编写到我们的应用程序中。我们团队的讨论之一是我们是否坚持使用单个主题并通过消息的属性进行过滤,或者为我们的特定需求创建一个主题。
我们的场景是希望通过优先级和环境变量进行过滤(测试和 uat 环境共享连接)。
那么我们有主题吗(类似):
或者,只是将这些值设置为两个属性的单个主题?
我的偏好是我们创建单独的主题,因为我们将利用可用的功能,并且我想在高负载下这会更好地扩展?我读过查看大队列可能效率低下。订阅单个主题似乎也更干净。
任何意见,将不胜感激。
我有一个队列处理器,它从 ServiceBus 队列中检索所有消息。我想知道应该如何确定 MessageReceiver PrefetchCount和 ReceiveBatch messageCount以优化性能。我目前将它们设置为任意数字 500,如下所示:
var receiverFactory = MessagingFactory.CreateFromConnectionString("ConnectionString");
var receiver = await receiverFactory.CreateMessageReceiverAsync("QueueName", ReceiveMode.PeekLock);
receiver.PrefetchCount = 500;
bool loopBatch = true;
while (loopBatch)
{
var tempMessages = await receiver.ReceiveBatchAsync(500, TimeSpan.FromSeconds(1));
// Do some message processing...
loopBatch = tempMessages.Any();
}
Run Code Online (Sandbox Code Playgroud)
运行时,我发现我的批次通常需要时间“预热”,检索诸如“1, 1, 1, 1, 1, 1, 1, 1, 125, 125, 125, 125...”之类的计数,其中批次检索数突然跳得更高。
来自预取优化文档:
当使用默认的锁过期时间 60 秒时,SubscriptionClient.PrefetchCount 的最佳值是工厂所有接收器的最大处理速率的 20 倍。例如,一个工厂创建3个接收器,每个接收器每秒最多可以处理10条消息。预取计数不应超过 20 X 3 X 10 = 600。默认情况下,QueueClient.PrefetchCount 设置为 0,这意味着不会从服务中提取任何其他消息。
当批量检索似乎一次检索的消息数量差异很大时,我真的不明白如何确定接收者的“每秒消息数”。任何帮助将不胜感激。
我目前正在研究Azure 服务总线,但我不明白队列与具有单个订阅的主题相比的真正好处。我发现在这两种情况下都可以创建不同的应用程序来侦听单个队列(或订阅)以更快地处理消息。
那么,如果使用主题可以以同样的方式工作,那么使用天蓝色服务总线队列的真正便利是什么?更好的性能?这个更便宜?
azureservicebus azure-servicebus-queues azure-servicebus-topics
MessageLockLostExceptions我在处理消息时不断收到消息。
LockDuration设置为 30 秒的队列,其中已经包含许多要处理的消息。现在我想LockDuration通过添加 来模拟运行时间稍长的消息处理任务(但仍在 之内)Task.Delay(10_000)。但随后我MessageLockLostException每收到 4 条消息就会收到一条消息。
即使我设置了MaxAutoRenewDuration = TimeSpan.FromDays(30)和 ,也会发生这种情况PrefetchCount = 0。
这是消息处理方法,我稍微修改一下,打印出剩余的锁定时长:
private static async Task processMessagesAsync(Message message, CancellationToken token)
{
Console.Write($"Received message: {message.SystemProperties.SequenceNumber}. Remaining lock duration: {message.SystemProperties.LockedUntilUtc - DateTime.UtcNow}");
await Task.Delay(10000);
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
Console.WriteLine(" - Complete!");
}
Run Code Online (Sandbox Code Playgroud)
示例输出:
======================================================
Press ENTER key to exit after receiving all the messages.
====================================================== …Run Code Online (Sandbox Code Playgroud) 我正在试验 Azure 和 Azure 服务总线。我只想将消息推送到队列中,然后让我的 C# azure 函数日志看到该消息。我正在努力让它发挥作用。我创建了一个天蓝色帐户,创建了一个资源组,创建了一个服务总线,下载了存储资源管理器。我有共享访问策略“RootManageSharedAccessKey”,它在门户中选中了“托管”选项。
那么你猜我接下来要做什么?我已转到 azure 函数并在 azure 门户中创建了一个函数,如下所示。当我打开存储资源管理器时,我不确定使用什么选项来连接到我的 azure 实例?
有什么帮助吗?谢谢你!
天蓝色函数
using System;
using System.Threading.Tasks;
public static void Run(string myQueueItem, ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
}
Run Code Online (Sandbox Code Playgroud) 我以为我可以用它来实现某种冗余,但 URI 几乎与 Primary 相同(唯一的区别是 SharedAccessKey)。
我编写了一段代码,一次性从 Azure 服务总线队列中读取 1000 条消息。我用以下行读取消息:await receiver.ReceiveMessagesAsync(1000);但只收到消息的子集。
我从示例中获取了代码: https : //github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample01_HelloWorld.cs,SendAndReceiveMessageSafeBatch() 方法
这是我的代码:
public class Program
{
static void Main(string[] args)
{
SendAndReceiveMessage().GetAwaiter().GetResult();
}
public static async Task SendAndReceiveMessage()
{
var connectionString = "myconnectionstring";
var queueName = "myqueue";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);
// create the sender
var sender = client.CreateSender(queueName);
IList<ServiceBusMessage> messages = new List<ServiceBusMessage>();
for (var i = 0; i < 1000; i++) …Run Code Online (Sandbox Code Playgroud) 我在 Azure 中将此应用程序服务作为 Web 作业运行。该应用程序是使用 NServiceBus 构建的前 Windows 服务,我一直在重新配置它以在 Azure 中工作。
该应用程序正在使用 AzureServiceBusTransport,并且我的 Azure 服务总线设置了队列。
我得到它的工作......当我使用在我的连接字符串中配置的共享访问密钥时,如下所示;
"ConnectionString": "Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=***"
Run Code Online (Sandbox Code Playgroud)
为了使其与托管身份一起使用,我在 NServiceBus EndpointConfiguration 中重新配置了:
transport.CustomTokenCredential(new DefaultAzureCredential());
Run Code Online (Sandbox Code Playgroud)
我还将连接字符串更改为以下内容;
"ConnectionString": "Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;Authentication=ManagedIdentity"
Run Code Online (Sandbox Code Playgroud)
但是当我尝试使用托管身份时,我收到以下异常;
ArgumentException:值“Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;Authentication=ManagedIdentity”不是格式良好的服务总线完全限定命名空间。
当我使用共享访问密钥时,MYNAMESPACE 显然是正确的,但当我使用托管身份时却不正确?
我们确实有一个可以使用托管身份的 Azure 功能,并且在这种情况下使用:
"ConnectionString": "Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;Authentication=ManagedIdentity"
Run Code Online (Sandbox Code Playgroud) azureservicebus ×10
azure ×7
c# ×4
servicebus ×2
.net ×1
async-await ×1
azure-queues ×1
masstransit ×1
nservicebus ×1