并行执行组中的任务

X X*_*X X 2 c# parallel-processing task-parallel-library async-await

我在一个简单的例子中描述了我的问题,然后描述了一个更接近的问题.

想象一下我们在box1中有n个项目[i1,i2,i3,i4,...,in],我们有一个box2可以处理m个项目(m通常远小于n).每个项目所需的时间不同.我想总是做m个工作项目,直到所有项目都进行.

一个更接近的问题是,例如,你有一个n个字符串(URL地址)的list1文件,我们希望有一个系统同时下载m个文件(例如通过httpclient.getAsync()方法).每当m个项目中的一个的下载完成时,必须尽快替换list1中的另一个剩余项目,并且必须对其进行计数,直到所有List1项目都继续进行.(n和m的数量由运行时输入的用户指定)

怎么做到这一点?

Sco*_*ain 6

您应该查看TPL Dataflow,将System.Threading.Tasks.Dataflow NuGet包添加到您的项目中,然后您想要的就像

private static HttpClient _client = new HttpClient();
public async Task<List<MyClass>> ProcessDownloads(IEnumerable<string> uris, 
                                                  int concurrentDownloads)
{
    var result = new List<MyClass>();

    var downloadData = new TransformBlock<string, string>(async uri =>
    {
        return await _client.GetStringAsync(uri); //GetStringAsync is a thread safe method.
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});

    var processData = new TransformBlock<string, MyClass>(
          json => JsonConvert.DeserializeObject<MyClass>(json), 
          new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded});

    var collectData = new ActionBlock<MyClass>(
          data => result.Add(data)); //When you don't specifiy options dataflow processes items one at a time.

    //Set up the chain of blocks, have it call `.Complete()` on the next block when the current block finishes processing it's last item.
    downloadData.LinkTo(processData, new DataflowLinkOptions {PropagateCompletion = true});
    processData.LinkTo(collectData, new DataflowLinkOptions {PropagateCompletion = true});

    //Load the data in to the first transform block to start off the process.
    foreach (var uri in uris)
    {
        await downloadData.SendAsync(uri).ConfigureAwait(false);
    }
    downloadData.Complete(); //Signal you are done adding data.

    //Wait for the last object to be added to the list.
    await collectData.Completion.ConfigureAwait(false);

    return result;
}
Run Code Online (Sandbox Code Playgroud)

在上面的代码唯一concurrentDownloads号码HttpClients的将被激活,在任何给定时间,无限线程将被处理接收到的字符串和在物体转动它们,和一个单独的线程将采取那些对象并将其添加到列表.

更新:这是一个简单的例子,它只能满足你在问题中提出的要求

private static HttpClient _client = new HttpClient();
public void ProcessDownloads(IEnumerable<string> uris, int concurrentDownloads)
{
    var downloadData = new ActionBlock<string>(async uri =>
    {
        var response = await _client.GetAsync(uri); //GetAsync is a thread safe method.
        //do something with response here.
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});


    foreach (var uri in uris)
    {
       downloadData.Post(uri);
    }
    downloadData.Complete();

    downloadData.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)


Dog*_*lan 6

这是您可以使用的通用方法.

当你调用这个TIn将是字符串(URL地址),asyncProcessor将是你的异步方法,它将URL地址作为输入并返回一个任务.

此方法使用的SlimSemaphore将实时仅允许n个并发异步I/O请求,一旦完成另一个请求将执行.像滑动窗口模式的东西.

public static Task ForEachAsync<TIn>(
            IEnumerable<TIn> inputEnumerable,
            Func<TIn, Task> asyncProcessor,
            int? maxDegreeOfParallelism = null)
        {
            int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
            SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

            IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
            {
                await throttler.WaitAsync().ConfigureAwait(false);
                try
                {
                    await asyncProcessor(input).ConfigureAwait(false);
                }
                finally
                {
                    throttler.Release();
                }
            });

            return Task.WhenAll(tasks);
        }
Run Code Online (Sandbox Code Playgroud)