在处理消息时正确处理 Azure 服务总线客户端

The*_*Man 3 c# azure azureservicebus

使用时,async/await我关心的是在处理消息的过程中处理客户端。考虑以下:

  1. 初始化队列客户端queueClient并将对它的引用存储在类的全局范围内

  2. 队列客户端处理一条消息并调用一些应用程序代码来处理它,这可能最终会做一些异步数据库工作或调用远程 api。

  3. 考虑应用程序是一个 Windows 服务,它的CloseAsync方法是在服务应该关闭时发出信号。在这个方法中,我调用queueClient.CloseAsync()

  4. 步骤 2 中正在完成的工作完成并转到调用message.Complete(). 在这一点上,我假设 queueClient 已关闭,消息将被视为失败。

确保队列客户端不再处理消息并等待关闭直到任何当前处理的消息完成的最佳实践是什么?

Pet*_*ons 5

在等待调用 queueClient.CloseAsync() 之前,您可以使用 CancellationToken 取消步骤 2 的工作和/或等待步骤 4 中的异步消息处理代码。我认为您对Tasks 和 Cancellation很熟悉。

等待消息处理任务

  1. 初始化队列客户端queueClient并将对它的引用存储在类的全局范围内

  2. 队列客户端处理一条消息并调用一些应用程序代码来处理它,这可能最终会做一些异步数据库工作或调用远程 api,例如public Task HandleMessageAsync() {..}。在类的全局范围内存储对此任务的引用。例如private Task messageHandleTask;

  3. 考虑应用程序是一个 Windows 服务,它的CloseAsync方法是在服务应该关闭时发出信号。在这个方法中,我先调用await messageHandleTask然后await queueClient.CloseAsync()

  4. 我们都活得长久而幸福。

在这种情况下,在消息处理完成之前,服务不会完全停止。

取消消息处理任务

  1. 初始化队列客户端queueClient并将对它的引用存储在类的全局范围内

  2. 队列客户端处理一条消息并调用一些应用程序代码来处理它,传递一个CancellationToken,这可能最终会做一些异步数据库工作或调用远程 api,例如public Task HandleMessageAsync(CancellationToken token) {..}。在类的全局范围内存储对此任务的引用。

  3. 考虑应用程序是一个 Windows 服务,它的CloseAsync方法是在服务应该关闭时发出信号。在这个方法中,我先调用cancellationTokenSource.Cancel(),然后调用,await messageHandleTask最后调用await queueClient.CloseAsync()

  4. 我们都活得长久而幸福。

在这种情况下,在消息处理代码中,就在调用message.Complete().您之前检查是否有任何取消:token.ThrowIfCancellationRequested。在这种情况下,当服务关闭时,消息永远不会达到完成状态,稍后将进行处理。(请注意,我不知道所涉及的代码,因此如果在取消发生之前工作已经部分完成,这种情况可能会很复杂)请务必处理任何OperationCanceledException.

并发消息处理

在并发处理多个消息的场景中,涉及更多的逻辑。流程是这样的:

  1. 当 Windows 服务即将关闭时,我们必须以某种方式停止处理更多消息
  2. 进程应该等待当时正在处理的消息完成
  3. 现在我们可以调用queueClient.CloseAsync().

不幸的是,没有停止接受更多消息的标准机制,所以我们必须自己构建。有一个Azure 反馈项目要求这样做,但它仍处于打开状态。

我根据此文档示例提出了以下实现上述流程的解决方案:

QueueClient queueClient;
CancellationTokenSource cts = new CancellationTokenSource();
ActionBlock<Message> actionBlock;

async Task Main()
{
    // Define message processing pipeline
    actionBlock = new ActionBlock<Message>(ProcessMessagesAsync, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10,
        MaxDegreeOfParallelism = 10
    });
    
    queueClient = new QueueClient("Endpoint=sb:xxx", "test");

    RegisterOnMessageHandlerAndReceiveMessages(cts.Token);

    Console.WriteLine("Press [Enter] to stop processing messages");
    Console.ReadLine();
    
    // Signal the message handler to stop processing messages, step 1 of the flow
    cts.Cancel();
    
    // Signal the processing pipeline that no more message will come in,  step 1 of the flow
    actionBlock.Complete();
    
    // Wait for all messages to be done before closing the client, step 2 of the flow
    await actionBlock.Completion;
        
    await queueClient.CloseAsync(); // step 3 of the flow
    Console.ReadLine();
}

void RegisterOnMessageHandlerAndReceiveMessages(CancellationToken stoppingToken)
{
    // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
    var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
    {
        // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
        // Set it according to how many messages the application wants to process in parallel.
        MaxConcurrentCalls = 10,

        // Indicates whether the message pump should automatically complete the messages after returning from user callback.
        // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
        AutoComplete = false
    };

    // Register the function that processes messages.
    queueClient.RegisterMessageHandler(async (msg, token) =>
    {
        // When the stop signal is given, do not accept more messages for processing
        if(stoppingToken.IsCancellationRequested)
            return;
            
        await actionBlock.SendAsync(msg);
        
    }, messageHandlerOptions);
}

async Task ProcessMessagesAsync(Message message)
{
    Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");

    // Process the message.
    await Task.Delay(5000);
    
    // Complete the message so that it is not received again.
    // This can be done only if the queue Client is created in ReceiveMode.PeekLock mode (which is the default).
    await queueClient.CompleteAsync(message.SystemProperties.LockToken);

    Console.WriteLine($"Completed message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
}

Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
    var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
    Console.WriteLine("Exception context for troubleshooting:");
    Console.WriteLine($"- Endpoint: {context.Endpoint}");
    Console.WriteLine($"- Entity Path: {context.EntityPath}");
    Console.WriteLine($"- Executing Action: {context.Action}");
    return Task.CompletedTask;
}
Run Code Online (Sandbox Code Playgroud)

  • 顺便说一句,如果这个答案适合你,给我一个松饼,它们看起来很好吃;-) (2认同)