use*_*426 9 c# wcf asynchronous semaphore async-await
我目前正在优化现有的,非常慢的和超时的生产应用程序。没有选择来重写它。
简而言之,这是一个WCF服务,当前依次调用其他四个“工作者” WCF服务。任何一个工人服务都不依赖于另一个的结果。因此,我们希望一次全部调用它们(而不是顺序调用)。我要重申的是,我们没有重写的奢望。
优化涉及使其立即调用所有工作者服务。这是想到异步的地方。
我在异步编程方面的经验有限,但是对于我的解决方案,我已经就该主题进行了尽可能多的阅读。
问题是,在测试中,它可以工作,但使我的CPU耗尽。谢谢您的帮助
以下是主要WCF服务中基本代码的简化版本
// The service operation belonging to main WCF Service
public void ProcessAllPendingWork()
{
var workerTasks = new List<Task<bool>>();
foreach(var workerService in _workerServices)
{
//DoWorkAsync is the worker method with the following signature:
// Task<bool> DoWorkAsync()
var workerTask = workerService.DoWorkAsync()
workerTasks.Add(workerTask);
}
var task = Task.Run(async ()=>
{
await RunWorkerTasks(workerTasks);
});
task.Wait();
}
private async RunWorkerTasks(IEnumerable<Tast<bool>> workerTasks)
{
using(var semaphore = new SemaphoreSlim(initialCount:3))
{
foreach (var workerTask in workerTasks)
{
await semaphore.WaitAsync();
try
{
await workerTask;
}
catch (System.Exception)
{
//assume 'Log' is a predefined logging service
Log.Error(ex);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我读过的东西:
您没有解释如何限制并发调用。您是要运行30个并发工作程序任务,还是要运行30个WCF调用,每个调用都有其所有工作程序任务并发运行,或者您希望并发WCF调用对每个任务都有自己的并发工作任务限制?假设您说每个WCF调用只有4个工作任务,并查看示例代码,我假设您希望全局限制为30个并发工作任务。
首先,正如@mjwills所暗示的那样,您需要使用SemaphoreSlim来限制对的调用workerService.DoWorkAsync()
。您的代码当前启动所有这些代码,并且仅试图限制您要等待的数量。我认为这就是为什么要最大程度地使用CPU。启动的工作程序任务数不受限制。但是请注意,在按住信号灯的同时,您还需要等待辅助任务,否则,您将仅限制创建任务的速度,而不必限制同时执行的任务。
其次,为每个WCF请求创建一个新的SemaphoreSlim。因此,我在第一段中的问题。限制一切的唯一方法是,如果您的工人服务数量超过初始数量(样本中为30),但是您说只有4个工人。要具有“全局”限制,您需要使用单例SemaphoreSlim。
通常,您永远不会调用.Release()
SemaphoreSlim,因此,如果您使它成为单例,则自该过程启动以来,一旦启动30个工作程序,您的代码就会挂起。确保在try-finally块中执行此操作,这样,如果工作程序崩溃,它仍然会被释放。
这是一些草率编写的示例代码:
public async Task ProcessAllPendingWork()
{
var workerTasks = new List<Task<bool>>();
foreach(var workerService in _workerServices)
{
var workerTask = RunWorker(workerService);
workerTasks.Add(workerTask);
}
await Task.WhenAll(workerTasks);
}
private async Task<bool> RunWorker(Func<bool> workerService)
{
// use singleton semaphore.
await _semaphore.WaitAsync();
try
{
return await workerService.DoWorkAsync();
}
catch (System.Exception)
{
//assume error is a predefined logging service
Log.Error(ex);
return false; // ??
}
finally
{
_semaphore.Release();
}
}
Run Code Online (Sandbox Code Playgroud)
除非我错过了什么 - 你的示例代码并行运行所有工作人员。当调用“workerService.DoWorkAsync()”时,工作线程开始工作。“RunWorkerTasks”仅等待工作任务完成。“DoWorkAsync()”启动异步操作,而“await”暂停执行调用方法,直到等待的任务完成。
CPU 使用率高的事实很可能是由于您的workerService 的活动而不是您调用它们的方式造成的。为了验证这一点,请尝试替换workerService.DoWorkAsync()
为Thread.Sleep(..)
或Task.Delay(..)
。如果你的 CPU 使用率下降,那就是工作人员的责任。(取决于workerService的作用)一旦并行运行它们,CPU消耗可能会增加,甚至是预期的。
回答您如何限制并行执行的问题。请注意,以下示例并不完全使用 3 个线程,而是最多使用 3 个线程。
Parallel.ForEach(
_workerServices,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
workerService => workerService.DoWorkAsync()
.ContinueWith(res =>
{
// Handle your result or possible exceptions by consulting res.
})
.Wait());
Run Code Online (Sandbox Code Playgroud)
正如您之前提到的,您的代码是按顺序执行的,我假设工作人员也有一个非异步等效项。使用这些可能更容易。同步调用异步方法通常很麻烦。我什至仅仅通过调用就遇到了死锁情况DoWorkAsync().Wait()
。关于如何同步运行 async Task<T> 方法已有很多讨论。。本质上我试图避免它。如果这是不可能的,我会尝试使用ContinueWith
增加复杂性的方法,或者AsyncHelper
使用前面的 SO 讨论。
var results = new ConcurrentDictionary<WorkerService, bool>();
Parallel.ForEach(
_workerServices,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
workerService =>
{
// Handle possible exceptions via try-catch.
results.TryAdd(workerService, workerService.DoWork());
});
// evaluate results
Run Code Online (Sandbox Code Playgroud)
Parallel.ForEach
利用线程或任务池。这意味着它将给定参数的每次执行分派Action<TSource> body
到专用线程上。您可以使用以下代码轻松验证这一点。如果Parallel.ForEach
已经将工作分派到不同的线程上,您可以简单地同步执行“昂贵”的操作。任何异步操作都是不必要的,甚至会对运行时性能产生不良影响。
Parallel.ForEach(
Enumerable.Range(1, 4),
m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
Run Code Online (Sandbox Code Playgroud)
这是我用于测试的演示项目,它不依赖于您的workerService。
private static bool DoWork()
{
Thread.Sleep(5000);
Console.WriteLine($"done by {Thread.CurrentThread.ManagedThreadId}.");
return DateTime.Now.Millisecond % 2 == 0;
}
private static Task<bool> DoWorkAsync() => Task.Run(DoWork);
private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
// define a thread-safe dict to store the results of the async operation
var results = new ConcurrentDictionary<int, bool>();
Parallel.ForEach(
Enumerable.Range(1, 4), // this replaces the list of workers
new ParallelOptions { MaxDegreeOfParallelism = 3 },
// m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());
sw.Stop();
// print results
foreach (var item in results)
{
Console.WriteLine($"{item.Key}={item.Value}");
}
Console.WriteLine(sw.Elapsed.ToString());
Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
427 次 |
最近记录: |