使用多个任务从大型集合中检索所有记录

Chr*_*ght 10 c# concurrency multithreading task

我正在开发一个调用外部服务的应用程序,并且必须将外部集合的所有条目添加到本地集合中.目前的问题是外部集合可能超过1000条记录,但返回的搜索结果最多只能包含20个项目.

为了速度,我认为使用一组任务将是前进的方向,所以我想出了下面的代码:

int totalCount = returnedCol.total_count;
        while (totalCount > myDict.Count)
        {
            int numberOfTasks = // logic to calculate how many tasks to run

            List<Task> taskList = new List<Task>();

            for (int i = 1; i <= numberOfTasks; i++)
            {
                Interlocked.Add(ref pageNumber, pageSize);

                Task<SearchResponse> testTask = Task.Run(() =>
                {
                    return ExternalCall.GetData(pageNumber, pageSize);
                });

                Thread.Sleep(100);

                taskList.Add(testTask);
                testTask.ContinueWith(o =>
                {
                    foreach (ExternalDataRecord dataiwant in testTask.Result.dataiwant)
                    {
                        if (!myDict.ContainsKey(dataiwant.id))
                            myDict.GetOrAdd(dataiwant.id, dataiwant);
                    }
                });
            }
            Task.WaitAll(taskList.ToArray());
        }
Run Code Online (Sandbox Code Playgroud)

但是,这不会产生所有结果.该pageNumber变量正确每次递增,但似乎不是所有的任务结果正在分析(如在上一个较小的数据的单个线程相同的逻辑设置返回所有预期的结果).此外,我尝试在链(而不是循环)中声明单个任务,并且全部返回测试数据.似乎我传入的值Thread.Sleep()越高,结果被添加到本地集合中越多(但这并不理想,因为这意味着该过程需要更长时间!)

目前在600个记录的样本中,我只有大约150-200个添加到该myDict集合中.我错过了一些明显的东西吗

Ber*_*sch 1

您错过了导致另一项任务的事实ContinueWith(),并且您没有添加您的taskList.

更好的方法是使用.NET 4.5 以来可用的async/ 。await它提供了一种不太繁重的解决方案。

您可以将算法更改为更像这样:

public async Task Process()
{
    int totalCount = returnedCol.total_count;

    while (totalCount > myDict.Count)
    {
        int numberOfTasks = // logic to calculate how many tasks to run

        List<Task> taskList = new List<Task>();

        for (int i = 1; i <= numberOfTasks; i++)
        {
            Interlocked.Add(ref pageNumber, pageSize);

            taskList.Add(ProcessPage(pageNumber, pageSize));
        }

        await Task.WhenAll(taskList.ToArray());
    }
 }

 private async Task ProcessPage(int pageNumber, int pageSize)
 {
       SearchResponse result = await Task.Run(() => 
           ExternalCall.GetData(pageNumber, pageSize)).ConfigureAwait(false);

       foreach (ExternalDataRecord dataiwant in result.dataiwant)
       {
           myDict.GetOrAdd(dataiwant.id, dataiwant);
       }
 }
Run Code Online (Sandbox Code Playgroud)

该关键字告诉编译器稍后 async会有。本质上处理您通话的详细信息。如果您确实希望在另一个任务中发生这种情况,那么您只需获取该调用的结果即可。awaitawaitContinueWithExternalCallawait

  • “ProcessPage”内部没有任何“await”,因此它不会是异步的。他的代码不会并行运行,因为“ProcessPage”永远不会在工作完成之前提前返回。您需要 `SearchResponse result = wait Task.Run(() =&gt;ExternalCall.GetData(pageNumber, pageSize)).ConfigureAwait(false);` 或更好的 `SearchResponse result = waitExternalCall.GetDataAsync(pageNumber, pageSize).ConfigureAwait (假);`如果可用的话。作为外部 Web 服务调用,您应该能够轻松地使其异步,而无需使用 `Task.Run(`。 (2认同)