等待列表中的所有任务,然后根据任务结果更新列表

Ras*_*kov 3 c# async-await

我想用WhenAll(不是一个一个)运行所有任务.但之后我需要根据结果更新列表(LastReport属性).

我想我有解决方案,但我想检查是否有更好的方法.

想法是:

  • 运行所有任务
  • 记住配置和任务之间的关系
  • 更新配置

我的解决方案是:

var lastReportAllTasks = new List<Task<Dictionary<string, string>>>();
var configurationTaskRelation = new Dictionary<int, Task<Dictionary<string, string>>>();
foreach (var configuration in MachineConfigurations)
{
    var task = machineService.GetReports(configuration);
    lastReportAllTasks.Add(task);
    configurationTaskRelation.Add(configuration.Id, task);
}

await Task.WhenAll(lastReportAllTasks);
foreach (var configuration in MachineConfigurations)
{
    var lastReportTask = configurationTaskRelation[configuration.Id];
    configuration.LastReport = await lastReportTask;
}
Run Code Online (Sandbox Code Playgroud)

Pan*_*vos 5

Select函数本身可以是异步的.您可以等待报告并返回配置并返回相同的结果对象(匿名类型或元组,无论您喜欢什么):

var tasks=MachineConfigurations.Select(async conf=>{
                       var report= await machineService.GetReports(conf);
                       return new {conf,report});
var results=await Task.WhenAll(tasks);
foreach(var pair in results)
{
    pair.conf.LastReport=pair.report;
}
Run Code Online (Sandbox Code Playgroud)

编辑 - 循环和错误处理

正如Servy建议的那样,Task.WhenAll可以省略,等待可以在循环中移动:

foreach(var task in tasks)
{   
    var pair=await task;
    pair.conf.LastReport=pair.report;
}
Run Code Online (Sandbox Code Playgroud)

任务仍将同时执行.但是在异常情况下,某些配置对象将被修改而某些配置对象不会被修改.

通常,这将是一个丑陋的情况,需要额外的异常处理代码来清理修改后的对象.当修改完成的异常处理是一个容易得多上的侧并最终定稿/幸福的路径完成时应用.这是更新内部配置对象的一个​​原因,Select()需要仔细考虑.

这种特殊情况下,尽管"跳过"失败的报告可能更好,但可能会将它们移到错误队列中并在以后重新处理它们.只要预期会出现这种情况,那么获得部分结果可能比没有任何结果更好:

foreach(var task in tasks)
{   
    try
    {
        var pair=await task;
        pair.conf.LastReport=pair.report;
    }
    catch(Exception exc)
    {
        //Make sure the error is logged
        Log.Error(exc);
        ErrorQueue.Enqueue(new ProcessingError(conf,ex);
    }
}
//Handle errors after the loop
Run Code Online (Sandbox Code Playgroud)

编辑2 - 数据流

为了完整起见,我做的有几千个售票报告每天产生,每个GDS调用(通过每一个旅行社出售门票的服务)需要相当长的时间.我无法同时运行所有请求 - 如果我尝试超过10个并发请求,我会开始收到服务器序列化错误.我也无法重试一切.

在这种情况下,我使用了TPL DataFlow和一些面向铁路的编程技巧.DOP为8的ActionBlock处理故障单请求.结果包含在一个Success类中并发送到下一个块.失败的请求和异常包含在Failure类中并发送到另一个块.这两个类都继承自IFlowEnvelope,它有一个Successful标志.是的,这是F#Disriminated Union羡慕.

这与超时等的一些重试逻辑相结合.

在伪代码中,管道如下所示:

var reportingBlock=new TransformBlock<Ticket,IFlowEnvelope<TicketReport>(reportFunc,dopOptions);
var happyBlock = new ActionBlock<IFlowEnvelope<TicketReport>>(storeToDb);
var errorBlock = new ActionBlock<IFlowEnvelope<TicketReport>>(logError);

reportingBlock.LinkTo(happyBlock,linkOptions,msg=>msg.Success);
reportingBlock.LinkTo(errorBlock,linkOptions,msg=>!msg.Success);

foreach(var ticket in tickets)
{
    reportingBlock.Post(ticket);
}
Run Code Online (Sandbox Code Playgroud)

reportFunc捕获任何异常并将它们包装为Failure<T>对象:

async Task<IFlowEnvelope<Ticket,TicketReport>> reportFunc(Ticket ticket)
{
    try
    {
        //Do the heavy processing
        return new Success<TicketReport>(report);
    }
    catch(Exception exc)
    {
        //Construct an error message, msg
        return new Failure<TicketReport>(report,msg);
    }
}
Run Code Online (Sandbox Code Playgroud)

真正的管道包括解析每日报告和个人票证的步骤.每次调用GDS需要1-6秒,因此管道的复杂性是合理的.