使用任务处理大量 HTTP 请求

Mar*_*ijn 2 .net c# task-parallel-library

因此,我在使用任务来处理 HTTP 请求负载时遇到了一些困难。

我想做的是从 WMTS 创建一个大图像。对于那些不知道的人来说,WMTS 是一种网络地图切片服务。因此基本上,您可以通过发送包含正确的tileRow 和tileColumn 的请求来请求256x256 的图像图块。因此,在本例中,我尝试构建包含数百甚至数千个此类图像图块的图像。

为此,我创建了一个应用程序:

  • 根据输入计算需要请求哪些图块。
  • 创建一个列表,我可以使用该列表向 WMTS 发出正确的 HTTP 请求。
  • 将这些请求发送到服务器并检索图像。
  • 将图像拼接成一张大图像。这就是我们想要的结果。

正如您所想象的那样,瓷砖的数量呈指数级增长。这并不会真正影响 CPU 工作,但主要是 I/O 密集型工作。因此,我认为在发送下一个请求之前,不要等待每个请求返回,而是使用任务来完成此任务。创建将处理每个单独请求的任务,并在所有任务完成后构建大图像。

所以这是我已经知道我要请求什么图块的方法。在这里,我想递归地发送带有任务的请求,直到所有数据完成(最终使用最大重试机制)。

    public Dictionary<Tuple<int, int>, Image> GetTilesParallel(List<Tuple<int, int>> tileMatrix, int retry = 0)
    {
        //The dictionary we will return
        Dictionary<Tuple<int, int>, Image> images = new Dictionary<Tuple<int, int>, Image>();

        //The dictionary that we will recursively request if tiles fail.
        List<Tuple<int, int>> failedTiles = new List<Tuple<int, int>>();

        //To track when tasks are finished
        List<Task> tasks = new List<Task>();

        foreach (var request in tileMatrix)
        {
            Tuple<int, int> imageTile = new Tuple<int, int>(request.Item1, request.Item2);

            var t = Task.Factory.StartNew(() => { return GetTileData(imageTile.Item1, imageTile.Item2); }, TaskCreationOptions.LongRunning).ContinueWith(tsk =>
            {
                if (tsk.Status == TaskStatus.RanToCompletion)
                {
                    var response = tsk.Result.Result.Content.ReadAsByteArrayAsync().Result;
                    images.Add(imageTile, Image.FromStream(new MemoryStream(response)));
                }
                else
                {
                    failedTiles.Add(imageTile);
                }
            });

            tasks.Add(t);
        }

        Task.WaitAll(tasks.ToArray());

        if (failedTiles.Count > 0)
        {
            Console.WriteLine($"Retrying {failedTiles.Count} requests");
            Thread.Sleep(500);
            Dictionary<Tuple<int, int>, Image> retriedImages = GetTilesParallel(failedTiles, retry++);

            foreach (KeyValuePair<Tuple<int, int>, Image> retriedImage in retriedImages)
            {
                images.Add(retriedImage.Key, retriedImage.Value);
            }
        }
        return images;
    }
Run Code Online (Sandbox Code Playgroud)

这是实际执行 HTTP 请求的方法(我知道不是最优的或干净的,但我首先尝试让某些东西工作)。

    private async Task<HttpResponseMessage> GetTileData(int tileColumn, int tileRow)
    {
        WMTSSettings settings = Service.Settings;

        Dictionary<string, string> requestParams = new Dictionary<string, string>();
        requestParams.Add("Request", "GetTile");
        requestParams.Add("Style", "Default");
        requestParams.Add("Service", "WMTS");
        requestParams.Add("Version", this.Service.Version);
        requestParams.Add("TileMatrixSet", settings.WMTSTileMatrixSet);
        requestParams.Add("TileMatrix", settings.WMTSTileMatrixSet + ":" + settings.WMTSTileMatrix);
        requestParams.Add("Format", settings.ImageFormat);
        requestParams.Add("Layer", settings.Layer);
        requestParams.Add("TileCol", tileColumn.ToString());
        requestParams.Add("TileRow", tileRow.ToString());

        string requestString = this.Service.BaseUri;

        for (int i = 0; i < requestParams.Count; i++)
        {
            if (i == 0)
            {
                requestString += "?";
            }

            requestString += requestParams.ElementAt(i).Key;
            requestString += "=";
            requestString += requestParams.ElementAt(i).Value;

            if (i != requestParams.Count - 1)
            {
                requestString += "&";
            }
        }

        CancellationTokenSource source = new CancellationTokenSource();
        CancellationToken token = source.Token;

        Task<HttpResponseMessage> response = HttppClient.GetAsync(requestString, token);

        return await response;
    }
Run Code Online (Sandbox Code Playgroud)

我目前面临两个问题,我已经尝试了很多方法:

  • 在当前的设置中,在我的ContinueWith任务中,我收到一些奇怪的错误,告诉我“对象引用未设置为对象的实例”。即使任务中的response变量和imageTile变量ContinueWith不为null?
  • 另一个问题是我仍然收到 TaskCancellationExceptions。但如果我是正确的,这些异常应该被继续任务捕获吗?

有人可以为我指出这个问题的正确方向吗?或者说,Tasks 就是正确的选择吗?

The*_*ias 5

是的,任务是要走的路,但不,ContinueWith 不是要走的路。这种方法主要是前 async-await 时代的遗物,现在很少有用。同样的道理Task.Factory.StartNew:引入该方法后,你很少需要使用该方法Task.Run

创建下载图块数据所需的任务的一种便捷方法是 LINQSelect运算符。你可以这样使用它:

async Task<Dictionary<(int, int), Image>> GetAllTileDataAsync(List<(int, int)> tiles)
{
    Task<(int, int, Image)>[] tasks = tiles.Select(async tile =>
    {
        (int tileColumn, int tileRow) = tile;
        int retry = 0;
        while (true)
        {
            try
            {
                using HttpResponseMessage response = await GetTileDataAsync(
                    tileColumn, tileRow);
                response.EnsureSuccessStatusCode();
                byte[] bytes = await response.Content.ReadAsByteArrayAsync();
                Image image = Image.FromStream(new MemoryStream(bytes));
                return (tileColumn, tileRow, image);
            }
            catch
            {
                if (retry >= 3) throw;
            }
            await Task.Delay(1000);
            retry++;
        }
    }).ToArray();
    (int, int, Image)[] results = await Task.WhenAll(tasks);
    return results.ToDictionary(e => (e.Item1, e.Item2), e => e.Item3);
}
Run Code Online (Sandbox Code Playgroud)

每个图块都被投影到一个Task<(int, int, Image)>. 任务的结果包含有关图块的所有初始和获取的信息。这消除了依赖危险的副作用来构建最终的Dictionary.

请注意上面代码中缺少任何Task.Factory.StartNew, .ContinueWith, .Result,.Wait()和。Task.WaitAll所有这些方法在现代异步应用程序中都是危险信号。一切都是由 async/await 组合发生的。不会创建任何线程,也不会阻塞任何线程,您的应用程序将达到最大的可扩展性和响应能力。

  • @aepot 是的,确实如此。为了简单起见,我添加了它(“IEnumerable&lt;Task&lt;(int, int, Image)&gt;&gt;”类型看起来令人畏惧),并且因为它可以防止误用。很容易意外地枚举两次“IEnumerable”,然后因为所有任务都执行了两次而感到沮丧。实现“IEnumerable”可以防止这种情况发生。 (2认同)
  • @aepot我添加了一个“using”语句来处理“HttpResponseMessage”。如果“GetTileDataAsync”返回“Task&lt;Image&gt;”而不是“Task&lt;HttpResponseMessage&gt;”,可能会更好。 (2认同)