ona*_*atm 4 c# threadpool async-await nancy dotnet-httpclient
我们有一个面向微服务的后端堆栈.所有的微服务都建立Nancy在windows服务之上并注册为windows服务topshelf.
处理大多数流量(~5000 req/s)的服务之一,在8个服务器中的3个服务器上开始出现线程池饥饿问题.
这是我们遇到特定端点时遇到的异常:
System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
at System.Net.HttpWebRequest.BeginGetResponse(AsyncCallback callback, Object state)
at System.Net.Http.HttpClientHandler.StartGettingResponse(RequestState state)
at System.Net.Http.HttpClientHandler.StartRequest(Object obj)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at RandomNamedClient.<GetProductBySkuAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ProductService.<GetBySkuAsync>d__3.MoveNext() in ...\ProductService.cs:line 34
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ProductModule.<>c__DisplayClass15.<<.ctor>b__b>d__1d.MoveNext() in ...\ProductModule.cs:line 32
Run Code Online (Sandbox Code Playgroud)
此端点调用另一个服务 - 它不在我的团队的域中 - 以获取产品数据.执行情况如下:
Get["/product/sku/{sku}", true] = async (parameters, ctx) =>
{
string sku = parameters.sku;
var product = await productService.GetBySkuAsync(sku);
return Response.AsJson(new ProductRepresentation(product));
};
Run Code Online (Sandbox Code Playgroud)
ProductService.GetBySkuAsync(string sku) 执行:
public async Task<Product> GetBySkuAsync(string sku)
{
var productDto = await randomNamedClient.GetProductBySkuAsync(sku);
if (productDto == null)
{
throw new ProductDtoNotFoundException("sku", sku);
}
var variantDto = productDto.VariantList.FirstOrDefault(v => v.Sku == sku);
if (variantDto == null)
{
throw new ProductVariantDtoNotFoundException("sku", sku);
}
return MapVariantDtoToProduct(variantDto, productDto);
}
Run Code Online (Sandbox Code Playgroud)
RandomNamedClient.GetProductBySkuAsync(string sku) 实现(来自内部包):
public async Task<ProductDto> GetProductBySkuAsync(string sku)
{
HttpResponseMessage result = await this._serviceClient.GetAsync("Product?Sku=" + sku);
return result == null || result.StatusCode != HttpStatusCode.OK ? (ProductDto) null : this.Decompress<ProductDto>(result);
}
Run Code Online (Sandbox Code Playgroud)
RandomNamedClient.Decompress<T>(HttpResponseMessage response) 执行:
private T Decompress<T>(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
return HttpContentExtensions.ReadAsAsync<T>(response.Content).Result;
using (GZipStream gzipStream = new GZipStream((Stream) new MemoryStream(response.Content.ReadAsByteArrayAsync().Result), CompressionMode.Decompress))
{
byte[] buffer = new byte[8192];
using (MemoryStream memoryStream = new MemoryStream())
{
int count;
do
{
count = gzipStream.Read(buffer, 0, 8192);
if (count > 0)
memoryStream.Write(buffer, 0, count);
}
while (count > 0);
return JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(memoryStream.ToArray()));
}
}
}
Run Code Online (Sandbox Code Playgroud)
我们所有的服务都是作为Release/32位构建的.我们没有调整有关线程池使用的任何信息.
我在这段代码中看到的最大问题是Decompress<T>阻塞异步操作的方法Task.Result.这可能会阻止当前正在处理请求的线程的检索到线程池,或者更糟糕的是导致代码中的死锁(这正是您不应该阻止异步代码的原因).我不确定你是否已经看到这些请求得到彻底处理,但是如果NancyFX正在处理同步上下文的编组(看起来确实如此),这很可能是线程池饥饿的根本原因.
您可以通过使所有IO处理在该方法内部工作来改变这一点async,并利用这些类已经公开的自然异步API.或者,我绝对不建议这样做,你可以ConfigureAwait(false)随处使用.
(旁注 - 您可以通过使用简化代码Stream.CopyToAsync())
正确的异步实现如下所示:
private async Task<T> DecompressAsync<T>(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
return await response.Content.ReadAsAsync<T>();
const int bufferSize = 8192;
using (GZipStream gzipStream = new GZipStream(
new MemoryStream(
await response.Content.ReadAsByteArrayAsync()),
CompressionMode.Decompress))
using (MemoryStream memoryStream = new MemoryStream())
{
await gzipStream.CopyToAsync(memoryStream, bufferSize);
return JsonConvert.DeserializeObject<T>(
Encoding.UTF8.GetString(memoryStream.ToArray()));
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1029 次 |
| 最近记录: |