异步/等待任务和WaitHandle

Che*_*hen 5 c# asynchronous task waithandle async-await

假设我有10N个项目(我需要通过http协议来获取它们),在代码N中开始启动任务以获取数据,每个任务依次获取10个项目。我把这些东西放在一个盒子里ConcurrentQueue<Item>。之后,将以不安全线程的方法一一处理这些项目。

async Task<Item> GetItemAsync()
{
    //fetch one item from the internet
}

async Task DoWork()
{
    var tasks = new List<Task>();
    var items = new ConcurrentQueue<Item>();
    var handles = new List<ManualResetEvent>();

    for i 1 -> N
    {
        var handle = new ManualResetEvent(false);
        handles.Add(handle);

        tasks.Add(Task.Factory.StartNew(async delegate
        {
            for j 1 -> 10
            {
                var item = await GetItemAsync();
                items.Enqueue(item);
            }
            handle.Set();
        });
    }

    //begin to process the items when any handle is set
    WaitHandle.WaitAny(handles);

    while(true)
    {
         if (all handles are set && items collection is empty) //***
           break; 
         //in another word: all tasks are really completed

         while(items.TryDequeue(out item))          
         {
              AThreadUnsafeMethod(item);    //process items one by one
         }
    }
}
Run Code Online (Sandbox Code Playgroud)

我不知道条件是否可以放在标记为的语句中***。我不能Task.IsCompleted在此处使用属性,因为我await在任务中使用了属性,因此任务很快就完成了。和bool[]指示任务是否执行到最后看起来非常难看,因为我觉得ManualResetEvent的可以做同样的工作。谁能给我一个建议?

Ste*_*ary 5

好吧,您可以自己构建它,但我认为使用TPL Dataflow 会更容易。

就像是:

static async Task DoWork()
{
  // By default, ActionBlock uses MaxDegreeOfParallelism == 1,
  //  so AThreadUnsafeMethod is not called in parallel.
  var block = new ActionBlock<Item>(AThreadUnsafeMethod);

  // Start off N tasks, each asynchronously acquiring 10 items.
  // Each item is sent to the block as it is received.
  var tasks = Enumerable.Range(0, N).Select(Task.Run(
      async () =>
      {
        for (int i = 0; i != 10; ++i)
          block.Post(await GetItemAsync());
      })).ToArray();

  // Complete the block when all tasks have completed.
  Task.WhenAll(tasks).ContinueWith(_ => { block.Complete(); });

  // Wait for the block to complete.
  await block.Completion;
}
Run Code Online (Sandbox Code Playgroud)