Dav*_*ury 0 c# parallel-processing task-parallel-library async-await
我最近发布了另一个问题,关于如何处理数据库中的项目列表,如果进程失败,则重试3次.
问题可以在这里找到:C#处理循环中的大项目列表,如果失败则重试
我对这个问题的答案做了一些修改,这里是代码:
我有一个继承ApiController类的抽象类,我的所有Web Api控制器都继承了ApiBaseController:
在ApiBaseController我已经定义了使用Repository模式的UnitOfWork以便CRUD SQL数据库.UnitOfWork和存储库模式工作正常,我测试了它,我没有遇到任何问题.
public abstract class ApiBaseController : ApiController
{
protected UnitOfWork Uow { get; set; }
protected override void Dispose(bool disposing)
{
if (Uow != null && Uow is IDisposable)
{
((IDisposable)Uow).Dispose();
Uow = null;
}
base.Dispose(disposing);
}
}
Run Code Online (Sandbox Code Playgroud)
在这里,我有一个JobController继承ApiBaseController继承的东西ApiController,使其成为Web Api控制器.
该控制器有一个端点api/endpoint/sendrequests,当被调用时,它将Jobs从数据库中获取所有数据并以10个批次处理它们.
该方法ProcessTaskAsync将处理从数据库检索的每个单独任务,如果它失败,那么它将尝试再处理2次,直到该任务被忽略.
除了在ProcessTaskAsync处理任务之后我尝试使用UnitOfWork将任务的结果保存到数据库之外,一切都工作得很好await Uow.Results.AddAsync(result);.在这里它失败了!问题是Uow对象是null,我不明白为什么.我的想法是因为任务是异步处理的,控制器执行结束意味着控制器被处理,因此UnitOfWork.
知道为什么Uow是null并且我该如何解决这个问题?
[AllowAnonymous]
[RoutePrefix("api/endpoint")]
public class JobController : ApiBaseController
{
private const int PAGED_LIST_SIZE = 10;
private const int MAX_RETRY_COUNT = 3;
public JobController()
{
Uow = new UnitOfWork();
}
[HttpPost]
[AllowAnonymous]
[Route("sendrequests")]
public async Task<IHttpActionResult> SendRequests()
{
try
{
var items = await Uow.Jobs.GetAllAsync();
await ProcessTasks(items);
return Ok();
}
catch (Exception ex)
{
return BadRequest();
}
}
private async Task ProcessTasks(List<Job> items)
{
var pagedResults = items.Paged<Job>(PAGED_LIST_SIZE);
if (pagedResults != null && pagedResults.Count() > 0)
{
foreach (var collection in pagedResults)
{
List<Task> tasks = new List<Task>();
foreach (var item in collection.ToList())
{
var task = Task.Factory.StartNew(() => ProcessTaskAsync(item));
tasks.Add(task);
}
try
{
await Task.WhenAll(tasks.ToArray());
}
catch (Exception ez)
{
//log here the failed ones
}
}
}
}
private async void ProcessTaskAsync(Job item)
{
if (item.Processed >= MAX_RETRY_COUNT)
{
Debug.WriteLine(string.Format("Max count {0} reached for task: {1} ", item.Processed.ToString(), item.Name));
return;
}
item.Processed++;
try
{
Result result = await item.Process();
if (result != null)
{
await Uow.Results.AddAsync(result);
}
Debug.WriteLine(string.Format("working item name: {0}", item.Name));
}
catch (Exception ex)
{
Debug.WriteLine(string.Format("Exception occured: {0}", ex.Message.ToString()));
ProcessTaskAsync(item);
}
}
Run Code Online (Sandbox Code Playgroud)
这用于将List作为List返回页面返回10.这位于静态类中.
public static IEnumerable<IEnumerable<T>> Paged<T>(this IEnumerable<T> enumerable, int chunkSize = 10)
{
int itemsReturned = 0;
var list = enumerable.ToList();
while (itemsReturned < list.Count)
{
int currentChunkSize = Math.Min(chunkSize, list.Count - itemsReturned);
yield return list.GetRange(itemsReturned, currentChunkSize);
itemsReturned += currentChunkSize;
}
}
Run Code Online (Sandbox Code Playgroud)
private async void ProcessTaskAsync(Job item)
Run Code Online (Sandbox Code Playgroud)
实际的工作方法必须返回Task,你可以等待.async void很可能只适用于顶级UI事件,绝对不是值得期待的工作.
你产生这样的工作Task.Factory.StartNew并等待结果.但是你正在等待的只是开始工作的外部任务(而且没有内部任务,它是void).我认为你的Controller在工作方法使用你的UoW实例之前就已经处理好了- 因为你实际上并没有在Controller方法中等待这项工作.
你应该做的是摆脱Task.Factory.StartNew(用于并行工作,而不是异步),并在实际工作完成后使ProcessTaskAsync返回成为Task信号.然后你可以等待那些Tasks WhenAll,它应该按预期工作.
asyncStephen Cleary的博客可能是相关内容的最佳参考/资源.我强烈建议您阅读TPL文章http://blog.stephencleary.com/