Sil*_*tre 3 .net c# parallel-processing task-parallel-library .net-4.5
我是TPL(任务并行库)的新手,我很难配置我的进程并行运行任务.
我正在开发一个发送批量电子邮件的应用程序(例如每分钟数千个,这就是这个想法),但当我看到处理器性能时,它并不好:我很确定有很多开销因为我没有使用任务库正确.
这是我的代码:
public async void MainProcess()
{
var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);
foreach (var batch in batches.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount))
{
await Task.WhenAll(from emailToProcess in batch
select ProcessSingleEmail(emailToProcess));
_emailsToProcessRepository.MarkBatchAsProcessed(batch);
}
}
private async Task ProcessSingleEmail(EmailToProcess emailToProcess)
{
try
{
MailMessage mail = GetMail(emailToProcess); //static light method
await _smtpClient.SendAsync(sendGridMail);
emailToProcess.Processed = true;
}
catch (Exception e)
{
_logger.Error(ErrorHelper.GetExceptionMessage(e,
string.Format("Error sending Email ID #{0} : ",
emailToProcess.Id)), e);
}
}
Run Code Online (Sandbox Code Playgroud)
(我知道它可能看起来很糟糕:请随意烤我☺)
我需要它以这种方式行事:我需要批处理一些记录(顺便说一句,我使用的库允许我使用"批处理"方法),因为我需要将一批记录标记为在进程完成发送时在数据库中处理.
这个过程实际上正在做我想要的事情:除了地狱之外很慢.正如你在perfmon中看到的那样,处理器的工作能力不是很高:

最好的方法是什么?有什么建议?
编辑:我意识到我所拥有的是一个开销问题.是否有任何工具或简单的方法来检测和纠正它们?
您所做的不是CPU限制,而是I/O限制,因此如果处理器可能会影响您的性能,则使用将并发任务的数量限制为该数字.尝试并行启动更多任务.
例如,下面的代码将异步处理所有电子邮件,但并行限制为100封电子邮件.它使用ForEachAsync扩展方法进行处理,该方法允许使用参数限制并行度,因此我将尝试尝试使该参数更大.
如果可能,您可能还希望使MarkBatchAsProcessed方法异步,因为这可能也会限制性能.
public static class Extensions
{
public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select ExecuteInPartition(partition, body));
}
}
public Task MainProcess()
{
// Process 100 emails at a time
return emailsToProcess.ForEachAsync(100, async (m) =>
{
await ProcessSingleEmail(m);
});
_emailsToProcessRepository.MarkBatchAsProcessed(emailsToProcess);
}
Run Code Online (Sandbox Code Playgroud)
您还应该避免使用void返回的异步方法,它们不会传播异常,也不能组合或等待,并且它们主要用于事件处理程序,因此我更改MainProcess为返回Task.
更新
上面代码中的数字100表示在任何时候最多会有100个并发任务,所以它更像是一个滑动窗口而不是一个批处理.如果您想批量处理电子邮件,可以执行以下操作(假设批次具有Count属性:
public async Task MainProcess()
{
var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);
foreach (var batch in batches)
{
return batch.ForEachAsync(batch.Count, async (m) =>
{
await ProcessSingleEmail(m);
});
_emailsToProcessRepository.MarkBatchAsProcessed(batch);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2902 次 |
| 最近记录: |