我如何防止CPU的“最大化”:异步调用多个工作进程并使用SemaphoreSlim进行限制的同步方法?

use*_*426 9 c# wcf asynchronous semaphore async-await

我目前正在优化现有的,非常慢的和超时的生产应用程序。没有选择来重写它

简而言之,这是一个WCF服务,当前依次调用其他四个“工作者” WCF服务。任何一个工人服务都不依赖于另一个的结果。因此,我们希望一次全部调用它们(而不是顺序调用)。我要重申的是,我们没有重写的奢望。

在此处输入图片说明

优化涉及使其立即调用所有工作者服务。这是想到异步的地方。

我在异步编程方面的经验有限,但是对于我的解决方案,我已经就该主题进行了尽可能多的阅读。

问题是,在测试中,它可以工作,但使我的CPU耗尽。谢谢您的帮助

以下是主要WCF服务中基本代码的简化版本

// The service operation belonging to main WCF Service
public void ProcessAllPendingWork()
{
    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    {
        //DoWorkAsync is the worker method with the following signature:
        // Task<bool> DoWorkAsync()

        var workerTask = workerService.DoWorkAsync()
        workerTasks.Add(workerTask);
    }

    var task = Task.Run(async ()=>
    {
        await RunWorkerTasks(workerTasks);
    });
    task.Wait();


}

private async RunWorkerTasks(IEnumerable<Tast<bool>> workerTasks)
{
    using(var semaphore = new SemaphoreSlim(initialCount:3))
    {

        foreach (var workerTask in workerTasks)
        {
            await semaphore.WaitAsync();
            try
            {
                await workerTask;
            }
            catch (System.Exception)
            {
                //assume 'Log' is a predefined logging service
                Log.Error(ex);
            }
        }
    }
} 
Run Code Online (Sandbox Code Playgroud)

我读过的东西:

如何限制并行任务处理的多种方法

如何限制并发异步I / O操作的数量?

C#中限制异步方法的方法

在C#中约束并发线程

使用信号量限制并发线程数

使用ChannelFactory和CreateChannel进行异步WCF调用

ziv*_*kan 9

您没有解释如何限制并发调用。您是要运行30个并发工作程序任务,还是要运行30个WCF调用,每个调用都有其所有工作程序任务并发运行,或者您希望并发WCF调用对每个任务都有自己的并发工作任务限制?假设您说每个WCF调用只有4个工作任务,并查看示例代码,我假设您希望全局限制为30个并发工作任务。

首先,正如@mjwills所暗示的那样,您需要使用SemaphoreSlim来限制对的调用workerService.DoWorkAsync()。您的代码当前启动所有这些代码,并且仅试图限制您要等待的数量。我认为这就是为什么要最大程度地使用CPU。启动的工作程序任务数不受限制。但是请注意,在按住信号灯的同时,您还需要等待辅助任务,否则,您将仅限制创建任务的速度,而不必限制同时执行的任务。

其次,为每个WCF请求创建一个新的SemaphoreSlim。因此,我在第一段中的问题。限制一切的唯一方法是,如果您的工人服务数量超过初始数量(样本中为30),但是您说只有4个工人。要具有“全局”限制,您需要使用单例SemaphoreSlim。

通常,您永远不会调用.Release()SemaphoreSlim,因此,如果您使它成为单例,则自该过程启动以来,一旦启动30个工作程序,您的代码就会挂起。确保在try-finally块中执行此操作,这样,如果工作程序崩溃,它仍然会被释放。

这是一些草率编写的示例代码:

public async Task ProcessAllPendingWork()
{
    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    {
        var workerTask = RunWorker(workerService);
        workerTasks.Add(workerTask);
    }

    await Task.WhenAll(workerTasks);
}

private async Task<bool> RunWorker(Func<bool> workerService)
{
    // use singleton semaphore.
    await _semaphore.WaitAsync();
    try
    {
        return await workerService.DoWorkAsync();
    }
    catch (System.Exception)
    {
        //assume error is a predefined logging service
        Log.Error(ex);
        return false; // ??
    }
    finally
    {
        _semaphore.Release();
    }
}
Run Code Online (Sandbox Code Playgroud)


sa.*_*.he 3

除非我错过了什么 - 你的示例代码并行运行所有工作人员。当调用“workerService.DoWorkAsync()”时,工作线程开始工作。“RunWorkerTasks”仅等待工作任务完成。“DoWorkAsync()”启动异步操作,而“await”暂停执行调用方法,直到等待的任务完成。

CPU 使用率高的事实很可能是由于您的workerService 的活动而不是您调用它们的方式造成的。为了验证这一点,请尝试替换workerService.DoWorkAsync()Thread.Sleep(..)Task.Delay(..)。如果你的 CPU 使用率下降,那就是工作人员的责任。(取决于workerService的作用)一旦并行运行它们,CPU消耗可能会增加,甚至是预期的。

回答您如何限制并行执行的问题。请注意,以下示例并不完全使用 3 个线程,而是最多使用 3 个线程。

    Parallel.ForEach(
        _workerServices,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        workerService => workerService.DoWorkAsync()
            .ContinueWith(res => 
            {
                // Handle your result or possible exceptions by consulting res.
            })
            .Wait());
Run Code Online (Sandbox Code Playgroud)

正如您之前提到的,您的代码是按顺序执行的,我假设工作人员也有一个非异步等效项。使用这些可能更容易。同步调用异步方法通常很麻烦。我什至仅仅通过调用就遇到了死锁情况DoWorkAsync().Wait()关于如何同步运行 async Task<T> 方法已有很多讨论。。本质上我试图避免它。如果这是不可能的,我会尝试使用ContinueWith增加复杂性的方法,或者AsyncHelper使用前面的 SO 讨论。

    var results = new ConcurrentDictionary<WorkerService, bool>();
    Parallel.ForEach(
        _workerServices,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        workerService => 
            {
                // Handle possible exceptions via try-catch.
                results.TryAdd(workerService, workerService.DoWork());
            });
    // evaluate results
Run Code Online (Sandbox Code Playgroud)

Parallel.ForEach利用线程或任务池。这意味着它将给定参数的每次执行分派Action<TSource> body到专用线程上。您可以使用以下代码轻松验证这一点。如果Parallel.ForEach已经将工作分派到不同的线程上,您可以简单地同步执行“昂贵”的操作。任何异步操作都是不必要的,甚至会对运行时性能产生不良影响。

    Parallel.ForEach(
        Enumerable.Range(1, 4),
        m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
Run Code Online (Sandbox Code Playgroud)

这是我用于测试的演示项目,它不依赖于您的workerService。

    private static bool DoWork()
    {
        Thread.Sleep(5000);
        Console.WriteLine($"done by {Thread.CurrentThread.ManagedThreadId}.");
        return DateTime.Now.Millisecond % 2 == 0;
    }

    private static Task<bool> DoWorkAsync() => Task.Run(DoWork);

    private static void Main(string[] args)
    {
        var sw = new Stopwatch();
        sw.Start();

        // define a thread-safe dict to store the results of the async operation
        var results = new ConcurrentDictionary<int, bool>();

        Parallel.ForEach(
            Enumerable.Range(1, 4), // this replaces the list of workers
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            // m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
            m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());

        sw.Stop();

        // print results
        foreach (var item in results)
        {
            Console.WriteLine($"{item.Key}={item.Value}");
        }

        Console.WriteLine(sw.Elapsed.ToString());
        Console.ReadLine();
    }
Run Code Online (Sandbox Code Playgroud)