使批量发送电子邮件并行的最佳方法

Sil*_*tre 3 .net c# parallel-processing task-parallel-library .net-4.5

我是TPL(任务并行库)的新手,我很难配置我的进程并行运行任务.

我正在开发一个发送批量电子邮件的应用程序(例如每分钟数千个,这就是这个想法),但当我看到处理器性能时,它并不好:我很确定有很多开销因为我没有使用任务库正确.

这是我的代码:

public async void MainProcess()
{
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);

    foreach (var batch in batches.AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount))
    {
         await Task.WhenAll(from emailToProcess in batch 
                    select ProcessSingleEmail(emailToProcess));
        _emailsToProcessRepository.MarkBatchAsProcessed(batch);
    }
}

private async Task ProcessSingleEmail(EmailToProcess emailToProcess)
{
    try
    {
        MailMessage mail = GetMail(emailToProcess); //static light method
        await _smtpClient.SendAsync(sendGridMail);
        emailToProcess.Processed = true;
    }
    catch (Exception e)
    {
        _logger.Error(ErrorHelper.GetExceptionMessage(e, 
                    string.Format("Error sending Email ID #{0} : ", 
                    emailToProcess.Id)), e);
    }
}
Run Code Online (Sandbox Code Playgroud)

(我知道它可能看起来很糟糕:请随意烤我☺)

我需要它以这种方式行事:我需要批处理一些记录(顺便说一句,我使用的库允许我使用"批处理"方法),因为我需要将一批记录标记为在进程完成发送时在数据库中处理.

这个过程实际上正在做我想要的事情:除了地狱之外很.正如你在perfmon中看到的那样,处理器的工作能力不是很高:

在此输入图像描述

最好的方法是什么?有什么建议?

编辑:我意识到我所拥有的是一个开销问题.是否有任何工具或简单的方法来检测和纠正它们?

Ned*_*nov 6

您所做的不是CPU限制,而是I/O限制,因此如果处理器可能会影响您的性能,则使用将并发任务的数量限制为该数字.尝试并行启动更多任务.

例如,下面的代码将异步处理所有电子邮件,但并行限制为100封电子邮件.它使用ForEachAsync扩展方法进行处理,该方法允许使用参数限制并行度,因此我将尝试尝试使该参数更大.

如果可能,您可能还希望使MarkBatchAsProcessed方法异步,因为这可能也会限制性能.

public static class Extensions
{
    public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
    {
        using (partition)
            while (partition.MoveNext())
                await body(partition.Current);
    }

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select ExecuteInPartition(partition, body));
    }
}

public Task MainProcess()
{
    // Process 100 emails at a time
    return emailsToProcess.ForEachAsync(100, async (m) =>
    {
        await ProcessSingleEmail(m);                
    });

    _emailsToProcessRepository.MarkBatchAsProcessed(emailsToProcess);
}
Run Code Online (Sandbox Code Playgroud)

您还应该避免使用void返回的异步方法,它们不会传播异常,也不能组合或等待,并且它们主要用于事件处理程序,因此我更改MainProcess为返回Task.

更新

上面代码中的数字100表示​​在任何时候最多会有100个并发任务,所以它更像是一个滑动窗口而不是一个批处理.如果您想批量处理电子邮件,可以执行以下操作(假设批次具有Count属性:

public async Task MainProcess()
{
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);

    foreach (var batch in batches)
    {
         return batch.ForEachAsync(batch.Count, async (m) =>
         {
             await ProcessSingleEmail(m);                
         });

       _emailsToProcessRepository.MarkBatchAsProcessed(batch);             
    }
}
Run Code Online (Sandbox Code Playgroud)