我正在使用akka流来处理我的数据.其中我有1个由元素UUID组成的Source.
流程如下:
现在我想为这个流添加重试机制,这样如果流中的任何阶段都失败了,它应该重试阶段一段时间没有说3,如果之后它失败则应该发生流的唯一失败.例如,如果第三方服务存在一些问题,例如HTTP 504错误,则在重试此元素成功后的大部分时间.那么在akka中有没有办法实现这一目标.
目前,我维护1个列表来存储所有失败的元素ID,如下所示.
代码:
List<UUID> failedId = new CopyOnWriteArrayList<>();
Source.from(elementIdToProcess).map(f -> {
failedId.add(f);
return f;
}).via(featureFlow()).via(filterNullFeaturesFlow())
.via(mapToFeatureFlow()).via(setFeaturesAdditionalInfo())
.runForeach(features -> {
features.parallelStream().forEach(feature -> {
try {
featureCommitter.save(feature);
failedId.remove(UUID.fromString(feature.getFeatureId()));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}, materializer);
Run Code Online (Sandbox Code Playgroud) 我正在实施 Polly 以在我的 C# Web 应用程序中重试请求。我的示例代码包含在这篇文章中。代码按预期工作,但传递给 CreateFile() 的最后一个参数(当前硬编码为 0)需要是 retryAttempt 的值。如何在执行操作中获取 retryAttempt 的值?
return Policy
.Handle<HttpException>(x => x.StatusCode == (HttpStatusCode)429)
.Or<StorageException>()
.WaitAndRetry(maxRetryCount, retryAttempt => TimeSpan.FromMilliseconds(Math.Pow(retryIntervalMs, retryAttempt)))
.Execute(() => CreateFile(fileContent, containerName, fileName, connectionString, 0));
Run Code Online (Sandbox Code Playgroud) Polly 策略的封装顺序会改变最终结果。如果我想使用以下策略,哪个顺序最好?这是我能想到的最好的顺序:重试应该提交到舱壁限制,而熔断是遵循超时策略的较低级别策略。说得通?
services.AddHttpClient<IService, Service>()
.AddPolicyHandler(PolicyConfig.RetryPolicy);
.AddPolicyHandler(PolicyConfig.BulkheadPolicy)
.AddPolicyHandler(PolicyConfig.CircuitBreakingPolicy)
.AddPolicyHandler(PolicyConfig.TimeoutPolicy)
Run Code Online (Sandbox Code Playgroud) 我在 Startup.ConfigureServices 方法中在 HttpClient 上创建了重试策略。另请注意,默认情况下,asp.net core 2.1 为 HttpClient 进行的每个调用记录 4 行 [信息] 行,这些行显示在我的问题末尾的日志中。
\nservices.AddHttpClient("ResilientClient")\n .AddPolicyHandler(\n Policy.WrapAsync(\n PollyRetryPolicies.TransientErrorRetryPolicy(),\n Policy.TimeoutAsync<HttpResponseMessage>(TimeSpan.FromSeconds(60))));\n
Run Code Online (Sandbox Code Playgroud)\n该策略定义如下。请注意,我将重试尝试写入日志,因此我会知道是否调用了重试策略。
\npublic static IAsyncPolicy < HttpResponseMessage > TransientErrorRetryPolicy() {\n return HttpPolicyExtensions\n .HandleTransientHttpError()\n .Or < TimeoutRejectedException > ()\n .WaitAndRetryAsync(sleepDurations: ExponentialBackoffPolicy.DecorrelatedJitter(3, SEED_DELAY, MAX_DELAY),\n onRetry: (message, timespan, attempt, context) => {\n context.GetLogger() ? .LogInformation($ "Retrying request to {message?.Result?.RequestMessage?.RequestUri} in {timespan.TotalSeconds} seconds. Retry attempt {attempt}.");\n });\n}\n
Run Code Online (Sandbox Code Playgroud)\nHandleTransientHttpError() 是一个 Polly 扩展,它在注释中指出:
\n\n\n配置要处理的条件是:\n\xe2\x80\xa2 网络故障(如 System.Net.Http.HttpRequestException)
\n
我的httpclient用法是这样的:
\nusing (HttpResponseMessage …
Run Code Online (Sandbox Code Playgroud) requests
我正在使用一种有点标准的模式来在 Python 中的请求周围放置重试行为,
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
retry_strategy = Retry(
total=HTTP_RETRY_LIMIT,
status_forcelist=HTTP_RETRY_CODES,
method_whitelist=HTTP_RETRY_METHODS,
backoff_factor=HTTP_BACKOFF_FACTOR
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
...
try:
response = http.get(... some request params ...)
except requests.Exceptions.RetryError as err:
# Do logic with err to perform error handling & logging.
Run Code Online (Sandbox Code Playgroud)
不幸的是,RetryError 上的文档没有解释任何内容,当我如上所述拦截异常对象时,err.response
是None
. 虽然您可以调用str(err)
来获取异常的消息字符串,但这将需要不合理的字符串解析来尝试恢复特定的响应详细信息,即使有人愿意尝试,该消息实际上也会忽略必要的详细信息。例如,来自某个站点上的故意调用的一个此类响应给出了 400 秒(并不是说您真的会重试此操作,而只是为了调试),它会给出一条消息 -"(Caused by ResponseError('too many 400 error responses'))"
它忽略了实际的响应详细信息,例如请求站点自己的描述文本400 错误的性质(这对于确定处理至关重要,甚至只是回传以记录错误)。
我想要做的是接收 …
前段时间,我需要retry
R中的一个函数来处理缓慢的服务器响应.该函数将具有以下行为:(尝试操作(函数或方法),如果失败,请稍等一下,然后重试)x10
我想出了以下内容:
retry <- function(fun, max_trys = 10, init = 0){
suppressWarnings(tryCatch({
Sys.sleep(0.3);
if(init<max_trys) {fun}
}, error=function(e){retry(fun, max_trys, init = init+1)}))}
Run Code Online (Sandbox Code Playgroud)
它运作良好.现在我在Python3中需要相同的东西,所以我尝试制作相同的代码:
import time
def retry_fun(fun, max_trys = 10, init=0):
try:
time.sleep(0.3)
if(init<max_trys):
fun
except:
retry_fun(fun, max_trys, init = init+1)
Run Code Online (Sandbox Code Playgroud)
但是当我运行它时,它会崩溃我的内核.由于我是Python的初学者,我不确定是什么导致了崩溃,以及函数是否/如何作为参数传递到另一个函数.
你能救我吗?
如果 HTTP 调用失败,我在非常基本的场景中使用 Polly 进行指数退避:
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
return await HandleTransientHttpError()
.Or<TimeoutException>()
.WaitAndRetryAsync(4, retryAttempt => TimeSpan.FromSeconds(Math.Pow(3, retryAttempt)))
.ExecuteAsync(async () => await base.SendAsync(request, cancellationToken).ConfigureAwait(false));
}
private static PolicyBuilder<HttpResponseMessage> HandleTransientHttpError()
{
return Policy
.HandleResult<HttpResponseMessage>(response => (int)response.StatusCode >= 500 || response.StatusCode == System.Net.HttpStatusCode.RequestTimeout)
.Or<HttpRequestException>();
}
Run Code Online (Sandbox Code Playgroud)
我有一个测试 API,它只是HttpListener
在while(true)
. 目前,我正在尝试测试客户端在每次调用收到 500 时是否正确重试。
while (true)
{
listener.Start();
Console.WriteLine("Listening...");
HttpListenerContext context = listener.GetContext();
HttpListenerRequest request = context.Request;
HttpListenerResponse response = context.Response;
response.StatusCode = (int)HttpStatusCode.InternalServerError;
//Thread.Sleep(1000 * 1); …
Run Code Online (Sandbox Code Playgroud) 我正在寻找HttpClient
对 a进行单元测试Polly
RetryPolicy
,并且我正在尝试找出如何控制响应的内容HTTP
。
我在客户端上使用了 a HttpMessageHandler
,然后覆盖发送异步,这效果很好,但是当我添加 Polly 重试策略时,我必须使用 a 创建 HTTP 客户端的实例IServiceCollection
,并且无法HttpMessageHandler
为客户端创建 a。我尝试过使用,.AddHttpMessageHandler()
但这会阻止轮询重试策略,并且只会触发一次。
这就是我在测试中设置 HTTP 客户端的方式
IServiceCollection services = new ServiceCollection();
const string TestClient = "TestClient";
services.AddHttpClient(name: TestClient)
.AddHttpMessageHandler()
.SetHandlerLifetime(TimeSpan.FromMinutes(5))
.AddPolicyHandler(KYA_GroupService.ProductMessage.ProductMessageHandler.GetRetryPolicy());
HttpClient configuredClient =
services
.BuildServiceProvider()
.GetRequiredService<IHttpClientFactory>()
.CreateClient(TestClient);
public static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.WaitAndRetryAsync(6,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetryAsync: OnRetryAsync);
}
private async static Task OnRetryAsync(DelegateResult<HttpResponseMessage> outcome, TimeSpan timespan, int retryCount, Context context)
{ …
Run Code Online (Sandbox Code Playgroud) 我正在使用 Polly 重试 HttpClient 尝试:
\n services.AddHttpClient<JoinPackageApiClient>(jp => { jp.BaseAddress = new Uri(appSettings.JoinPackageWS.BaseUrl); })\n .AddPolicyHandler(GetRetryPolicy(appSettings, serviceProvider))\n
Run Code Online (Sandbox Code Playgroud)\n哪里 GetRetryPolicy
:
private static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy(AppSettings appSettings, ServiceProvider serviceProvider)\n {\n return HttpPolicyExtensions\n .HandleTransientHttpError()\n .OrResult(msg => msg.StatusCode != HttpStatusCode.OK)\n .Or<TimeoutRejectedException>()\n .Or<TaskCanceledException>()\n .Or<OperationCanceledException>()\n\n .WaitAndRetryAsync(appSettings.PollySettings.RetryAttempts, (retryAttempt, c) =>\n {\n\n return TimeSpan.FromSeconds(2);\n }, onRetry: (response, delay, retryCount, context) =>\n {\n\n //\xe2\x96\x88how can I access the full(!) HttpClient's URI here ?\n //e.g. : https://a.com/b/c?d=1\n \n });\n }\n
Run Code Online (Sandbox Code Playgroud)\n问题:
\n查看该onRetry
参数,我想在该onRetry
部分中记录完整的 URL …
我将 Polly ( Microsoft.Extensions.Http.Polly
) 与 .net core 一起使用,并进行此配置(使用无效的 URL,用于测试):
private static void RegisterServices()
{
var collection = new ServiceCollection();
var timeoutPolicy = Policy.TimeoutAsync<HttpResponseMessage>(2); // Timeout for an individual try
collection.AddHttpClient<INetworkService, NetworkService>(url=>
{
url.BaseAddress = new Uri("http://www.google.com:81"); //test bad url
})
.AddPolicyHandler(GetRetryPolicy())
.AddPolicyHandler(timeoutPolicy); ;
_serviceProvider = collection.BuildServiceProvider();
}
Run Code Online (Sandbox Code Playgroud)
哪里GetRetryPolicy
:
private static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.OrResult(msg => msg.StatusCode != HttpStatusCode.OK)
.Or<TimeoutRejectedException>()
.Or<TaskCanceledException>()
.Or<OperationCanceledException>()
.WaitAndRetryAsync(3, retryAttempt =>
{
return TimeSpan.FromSeconds(2);
},
onRetry: (response, delay, retryCount, context) => …
Run Code Online (Sandbox Code Playgroud) retry-logic ×10
c# ×7
polly ×7
.net-core ×2
python ×2
python-3.x ×2
.net ×1
akka ×1
akka-stream ×1
bulkhead ×1
exception ×1
java ×1
unit-testing ×1