如何指定Parallel.ForEach中执行的并行任务数?

Mic*_* B. 2 .net c# parallel-processing parallel.foreach

我有~500个任务,每个任务需要约5秒,大部分时间浪费在等待远程资源回复上.我想定义应该自己生成的线程数(经过一些测试)并在这些线程上运行任务.当一个任务完成时,我想在可用的线程上生成另一个任务.

我发现System.Threading.Tasks最容易实现我想要的,但我认为不可能指定应该并行执行的任务数量.对于我的机器,它总是大约8(四核cpu).是否有可能以某种方式告诉应该并行执行多少任务?如果不是最简单的方法来实现我想要的东西?(我试过线程,但代码要复杂得多).我尝试增加MaxDegreeOfParallelism参数,但它只限制了最大数量,所以这里没有运气......

这是我目前的代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        private static List<string> _list = new List<string>();
        private static int _toProcess = 0;

        static void Main(string[] args)
        {   
            for (int i = 0; i < 1000; ++i)
            {
                _list.Add("parameter" + i);
            }

            var w = new Worker();
            var w2 = new StringAnalyzer();

            Parallel.ForEach(_list, new ParallelOptions() { MaxDegreeOfParallelism = 32 }, item =>
            {
                ++_toProcess;
                string data = w.DoWork(item);
                w2.AnalyzeProcessedString(data);
            });

            Console.WriteLine("Finished");           
            Console.ReadKey();
        }

        static void Done(Task<string> t)
        {            
            Console.WriteLine(t.Result);
            --_toProcess;
        }
    }

    class Worker
    {
        public string DoWork(string par)
        {
            // It's a long running but not CPU heavy task (downloading stuff from the internet)
            System.Threading.Thread.Sleep(5000);            
            return par + " processed";
        }
    }

    class StringAnalyzer
    {
        public void AnalyzeProcessedString(string data)
        {
            // Rather short, not CPU heavy
            System.Threading.Thread.Sleep(1000);
            Console.WriteLine(data + " and analyzed");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

L.B*_*L.B 6

假设您可以在获取资源时使用本机异步方法HttpClient.GetStringAsync,

int numTasks = 20;
SemaphoreSlim semaphore = new SemaphoreSlim(numTasks);
HttpClient client = new HttpClient();

List<string> result = new List<string>();
foreach(var url in urls)
{
    semaphore.Wait();

    client.GetStringAsync(url)
          .ContinueWith(t => {
              lock (result) result.Add(t.Result);
              semaphore.Release();
          });
}

for (int i = 0; i < numTasks; i++) semaphore.Wait();
Run Code Online (Sandbox Code Playgroud)

由于GetStringAsync 内部使用IO完成端口(与大多数其他异步IO方法一样)而不是创建新线程,因此这可以是您所追求的解决方案.

另见http://blog.stephencleary.com/2013/11/there-is-no-thread.html