Gun*_*Gun 1 c# parallel-processing multithreading
我正在处理一个项目列表 (200k - 300k),每个项目的处理时间在 2 到 8 秒之间。为了赢得时间,我可以并行处理这个列表。当我处于异步上下文中时,我使用如下内容:
public async Task<List<Keyword>> DoWord(List<string> keyword)
{
ConcurrentBag<Keyword> keywordResults = new ConcurrentBag<Keyword>();
if (keyword.Count > 0)
{
try
{
var tasks = keyword.Select(async kw =>
{
return await Work(kw).ConfigureAwait(false);
});
keywordResults = new ConcurrentBag<Keyword>(await Task.WhenAll(tasks).ConfigureAwait(false));
}
catch (AggregateException ae)
{
foreach (Exception innerEx in ae.InnerExceptions)
{
log.ErrorFormat("Core threads exception: {0}", innerEx);
}
}
}
return keywordResults.ToList();
}
Run Code Online (Sandbox Code Playgroud)
关键字列表始终包含 8 个元素(来自上面),因此我按 8 个逐个处理列表,但在这种情况下,我猜测如果 3 秒内处理 7 个关键字,10 秒内处理第 8 个关键字,则总时间8 个关键字将是 10 个(如果我错了,请纠正我)。从那时起我该如何应对Parallel.Foreach?我的意思是:启动 8 个关键字,如果其中 1 个关键字完成,则再启动 1 个。在这种情况下,我将永久拥有 8 个工作流程。任何想法 ?
另一种更简单的方法是使用AsyncEnumerator NuGet Package:
using System.Collections.Async;
public async Task<List<Keyword>> DoWord(List<string> keywords)
{
var keywordResults = new ConcurrentBag<Keyword>();
await keywords.ParallelForEachAsync(async keyword =>
{
try
{
var result = await Work(keyword);
keywordResults.Add(result);
}
catch (AggregateException ae)
{
foreach (Exception innerEx in ae.InnerExceptions)
{
log.ErrorFormat("Core threads exception: {0}", innerEx);
}
}
}, maxDegreeOfParallelism: 8);
return keywordResults.ToList();
}
Run Code Online (Sandbox Code Playgroud)