如何正确排队任务以在C#中运行

Mik*_*sen 19 .net c# multithreading asynchronous async-await

我有一个items(RunData.Demand)的枚举,每个都代表一些涉及通过HTTP调用API的工作.如果我只是foreach通过它并在每次迭代期间调用API,它的工作效果很好.但是,每次迭代需要一两秒钟,所以我想运行2-3个线程并在它们之间划分工作.这是我正在做的事情:

ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads
var tasks = RunData.Demand
   .Select(service => Task.Run(async delegate
   {
      var availabilityResponse = await client.QueryAvailability(service);
      // Do some other stuff, not really important
   }));

await Task.WhenAll(tasks);
Run Code Online (Sandbox Code Playgroud)

client.QueryAvailability调用基本上使用HttpClient该类调用API :

public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request)
{
   var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request);

   if (response.IsSuccessStatusCode)
   {
      return await response.Content.ReadAsAsync<QueryAvailabilityResponse>();
   }

   throw new HttpException((int) response.StatusCode, response.ReasonPhrase);
}
Run Code Online (Sandbox Code Playgroud)

这种方法很有效,但最终事情开始超时.如果我将HttpClient超时设置为一小时,那么我开始得到奇怪的内部服务器错误.

我开始做的是在QueryAvailability方法中设置秒表以查看发生了什么.

发生的事情是RunData.Demand中的所有1200个项目一次创建,并且await client.PostAsJsonAsync正在调用所有1200个方法.它似乎然后使用2个线程慢慢检查任务,所以最后我有等待9或10分钟的任务.

这是我想要的行为:

我想创建1,200个任务,然后在线程可用时一次运行3-4个任务.我希望排队1200 HTTP立即调用.

这有什么好办法吗?

i3a*_*non 21

正如我一直建议..你需要的是TPL Dataflow(安装:) Install-Package Microsoft.Tpl.Dataflow.

您创建一个ActionBlock动作以对每个项目执行.设置MaxDegreeOfParallelism为限制.开始发布并等待其完成:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{
    var availabilityResponse = await client.QueryAvailability(service);
    // ...
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

foreach (var service in RunData.Demand)
{
    block.Post(service);
}

block.Complete();
await block.Completion;
Run Code Online (Sandbox Code Playgroud)

  • 完美无瑕!我现在是粉丝.接受. (2认同)

Sea*_*nOB 5

古老的问题,但是我想提出一个使用SemaphoreSlim类的替代轻量级解决方案。只是参考System.Threading。

SemaphoreSlim sem = new SemaphoreSlim(4,4);

foreach (var service in RunData.Demand)
{

    await sem.WaitAsync();
    Task t = Task.Run(async () => 
    {
        var availabilityResponse = await client.QueryAvailability(serviceCopy));    
        // do your other stuff here with the result of QueryAvailability
    }
    t.ContinueWith(sem.Release());
}
Run Code Online (Sandbox Code Playgroud)

信号量充当锁定机制。您只能通过调用Wait(WaitAsync)输入信号量,该操作从计数中减去1。呼叫发布将计数加一。