如何限制.net Core API项目中多个线程上对HttpClient的所有传出异步调用

Tro*_*roy 1 semaphore dotnet-httpclient asp.net-core httpclientfactory

我正在设计一个.net核心Web API,它使用了我无法控制的外部API。我在堆栈溢出中找到了一些极好的答案,这些问题使我可以在使用semaphoreslim的同一线程中限制对此外部API的请求。我想知道如何最好地将这种限制扩展到整个应用程序,而不仅仅是限制特定任务列表。我一直在学习HttpMessageHandlers,这似乎是拦截所有传出消息并应用限制的一种可能方法。但是我担心线程安全性和锁定问题,我可能不了解。我包括了当前的限制代码,希望对理解我正在尝试做的事情有所帮助,但是要跨越多个线程,并且不断添加任务而不是预先定义的任务列表。

private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
        var rtn = new List<PagedResultResponse>();
        var allTasks = new List<Task>();
        var throttler = new SemaphoreSlim(initialCount: throttle);
        foreach (var page in pages)
        {
            await throttler.WaitAsync();
            allTasks.Add(
                Task.Run(async () =>
                {
                    try
                    {
                        var result = await GetPagedResult(client, url, page);
                        return result;
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
        }
        await Task.WhenAll(allTasks);
        foreach (var task in allTasks)
        {
            var result = ((Task<PagedResultResponse>)task).Result;
            rtn.Add(result);
        }
        return rtn;
}
Run Code Online (Sandbox Code Playgroud)

mou*_*ler 8

概念问题

实施简单

所以ThrottlingDelegatingHandler可能看起来像这样:

public class ThrottlingDelegatingHandler : DelegatingHandler
{
    private SemaphoreSlim _throttler;

    public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
    {
        _throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        if (request == null) throw new ArgumentNullException(nameof(request));

        await _throttler.WaitAsync(cancellationToken);
        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        finally
        {
            _throttler.Release();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

创建并维护一个实例作为单例:

int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)); 
Run Code Online (Sandbox Code Playgroud)

将其DelegatingHandler应用于HttpClient要通过并行限制调用的所有实例:

HttpClient throttledClient = new HttpClient(throttle);
Run Code Online (Sandbox Code Playgroud)

HttpClient不必是单例的:只有throttle实例需要。

为了简洁起见,我省略了Dot Net Core DI代码,但是您将在ThrottlingDelegatingHandler.Net Core的容器中注册单例实例,在使用时通过DI获取该单例,并在HttpClient如上所示的s中使用它。

但:

使用HttpClientFactory(.NET Core 2.1)更好地实现

上面仍然提出了一个问题,您将如何管理HttpClient生命周期:

  • 辛格尔顿(应用范围的)HttpClient小号不收拾DNS更新。您的应用程序将不知道DNS更新,除非您终止并重新启动它(可能是不可取的)。
  • using (HttpClient client = ) { }另一方面,频繁创建和处理的模式可能导致套接字耗尽

的设计目标之一HttpClientFactory是管理HttpClient实例及其委派处理程序的生命周期,以避免这些问题。

在.NET Core 2.1中,您可以在类HttpClientFactory中将其全部连接起来,如下所示:ConfigureServices(IServiceCollection services)Startup

int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));

services.AddHttpClient("MyThrottledClient")
    .AddHttpMessageHandler<ThrottlingDelegatingHandler>();
Run Code Online (Sandbox Code Playgroud)

(这里的“ MyThrottledClient”是一种命名客户端方法,只是为了使本示例简短;键入的客户端避免了字符串命名。)

在使用时,IHttpClientFactory通过DI(引用)获得一个,然后调用

var client = _clientFactory.CreateClient("MyThrottledClient");
Run Code Online (Sandbox Code Playgroud)

获得HttpClient使用singleton预先配置的实例ThrottlingDelegatingHandler

通过HttpClient这种方式获得的实例中的所有调用都将被限制(通常在整个应用中)到最初配置的int maxParallelism

HttpClientFactory神奇地处理了所有HttpClient生命周期问题。

将Polly与IHttpClientFactory结合使用以获取所有“现成的”结果

Polly 与IHttpClientFactory紧密集成,并且Polly还提供了Bulkhead策略,该策略通过相同的SemaphoreSlim机制用作并行调节器

因此,除了手动滚动a之外ThrottlingDelegatingHandler,您还可以直接使用带有IHttpClientFactory的Polly Bulkhead策略。在您的Startup课程中,只需:

int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);

services.AddHttpClient("MyThrottledClient")
    .AddPolicyHandler(throttler);
Run Code Online (Sandbox Code Playgroud)

HttpClient如前所述,从HttpClientFactory 获取预配置的实例。和以前一样,通过这样的“ MyThrottledClient” HttpClient实例的所有调用都将被并行限制到configure maxParallelism

Polly Bulkhead策略还提供了配置要允许同时“排队”主信号灯中的执行插槽的操作数量的功能。因此,例如:

var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);
Run Code Online (Sandbox Code Playgroud)

如上配置为时HttpClient,将允许10个并行http调用,以及最多100个http调用以“排队”执行插槽。通过防止故障下游系统引起上游排队呼叫的过多资源膨胀,这可以为高吞吐量系统提供额外的弹性。

要将Polly选项与HttpClientFactory一起使用,请引入Microsoft.Extensions.Http.PollyPollynuget软件包。

参考:Polly和IHttpClientFactory上的Polly deep doco隔壁政策


附录任务

问题使用Task.Run(...)并提及:

消耗外部api的.net核心Web api

和:

不断添加任务,而不是预先定义的任务列表。

如果您的.net核心Web api 每个请求仅使用一次外部API ,.net核心Web api处理,并且您采用了此答案其余部分中讨论的方法TaskTask.Run(...)则无需将下游外部http调用卸载为new ,并且仅在其他Task实例和线程切换中产生开销。点网核心将已经在线程池上的多个线程上运行传入的请求。