Col*_*ett 5 c# web-crawler tpl-dataflow .net-core polly
介绍:
我正在构建一个单节点网络爬虫来简单地验证200 OK.NET Core 控制台应用程序中的URL 。我在不同的主机上有一组 URL,我用HttpClient. 我对使用 Polly 和 TPL Dataflow 还很陌生。
要求:
MaxDegreeOfParallelism.429 TooManyRequests使用 Polly 策略优雅地处理每个主机的响应。或者,我可以使用断路器在收到一个429响应时取消对同一主机的并发请求,然后一次一个地处理该特定主机?当前实施:
我当前的实现是有效的,除了我经常看到x对同一主机的并行请求429大约在同一时间返回......然后,他们都暂停重试策略......然后,他们都猛烈抨击同一台主机再次同时经常仍然收到429s。即使我在整个队列中均匀分布同一主机的多个实例,我的 URL 集合也会因一些429最终仍开始生成s 的特定主机而超重。
收到 a 后429,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200s。
验证器方法:
public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
var validator = new TransformBlock<Uri, bool>(
async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode,
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = MaxDegreeOfParallelism}
);
foreach (var url in urls)
await validator.SendAsync(url, cancellationToken);
validator.Complete();
var validUrlCount = 0;
while (await validator.OutputAvailableAsync(cancellationToken))
{
if(await validator.ReceiveAsync(cancellationToken))
validUrlCount++;
}
await validator.Completion;
return validUrlCount;
}
Run Code Online (Sandbox Code Playgroud)
应用于GetValidCount()上面使用的 HttpClient 实例的 Polly 策略。
IAsyncPolicy<HttpResponseMessage> waitAndRetryTooManyRequests = Policy
.HandleResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(3,
(retryCount, response, context) =>
response.Result?.Headers.RetryAfter.Delta ?? TimeSpan.FromMilliseconds(120),
async (response, timespan, retryCount, context) =>
{
// log stuff
});
Run Code Online (Sandbox Code Playgroud)
题:
我如何修改或替换此解决方案以增加对需求 #2 的满意度?
我会尝试引入某种标志LimitedMode 来检测该特定客户端是否以有限模式进入。下面我声明了两个策略 - 一个简单的重试策略只是为了捕获 TooManyRequests 并设置标志。第二个策略是开箱即用的BulkHead策略。
public void ConfigureServices(IServiceCollection services)
{
/* other configuration */
var registry = services.AddPolicyRegistry();
var catchPolicy = Policy.HandleResult<HttpResponseMessage>(r =>
{
LimitedMode = r.StatusCode == HttpStatusCode.TooManyRequests;
return false;
})
.WaitAndRetryAsync(1, i => TimeSpan.FromSeconds(3));
var bulkHead = Policy.BulkheadAsync<HttpResponseMessage>(1, 10, OnBulkheadRejectedAsync);
registry.Add("catchPolicy", catchPolicy);
registry.Add("bulkHead", bulkHead);
services.AddHttpClient<CrapyWeatherApiClient>((client) =>
{
client.BaseAddress = new Uri("hosturl");
}).AddPolicyHandlerFromRegistry(PolicySelector);
}
Run Code Online (Sandbox Code Playgroud)
然后,您可能希望使用以下机制动态决定应用哪个策略PolicySelector:如果受限模式处于活动状态,则将 Bulk Head 策略与 catch 429 策略一起包装。如果收到成功状态代码 - 切换回无隔板的常规模式。
private IAsyncPolicy<HttpResponseMessage> PolicySelector(IReadOnlyPolicyRegistry<string> registry, HttpRequestMessage request)
{
var catchPolicy = registry.Get<IAsyncPolicy<HttpResponseMessage>>("catchPolicy");
var bulkHead = registry.Get<IAsyncPolicy<HttpResponseMessage>>("bulkHead");
if (LimitedMode)
{
return catchPolicy.WrapAsync(bulkHead);
}
return catchPolicy;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
697 次 |
| 最近记录: |