当代码等待延迟时间不确定的某种情况时,看起来很多人选择使用指数退避,即等待N秒,检查条件是否满足; 如果没有,等待2N秒,检查条件等.这对于检查恒定/线性增加的时间跨度有什么好处?
linear-programming non-deterministic exponential-backoff retry-logic
我正在尝试实现一个基于Spring Boot的Kafka消费者,它具有一些非常强大的消息传递保证,即使在出现错误的情况下也是如此.
我们当前的实施满足以下要求:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
final ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProperties.setErrorHandler(errorHandler());
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(1.5);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}
Run Code Online (Sandbox Code Playgroud)
但是,在这里,记录永远被消费者锁定.在某些时候,处理时间将超过max.poll.interval.ms
,服务器将分区重新分配给其他消费者,从而创建一个副本.
假设max.poll.interval.ms
等于5分钟(默认)并且故障持续30分钟,这将导致消息被处理ca. 6次.
另一种可能性是在N次重试(例如3次尝试)之后通过使用将消息返回到队列SimpleRetryPolicy
.然后,将重播该消息(感谢SeekToCurrentErrorHandler
)并且处理将从头开始,最多再次尝试5次.这导致延迟形成一系列例如
10 secs …
Run Code Online (Sandbox Code Playgroud) messaging apache-kafka spring-retry spring-kafka exponential-backoff
我刚刚升级到 AWS SDK V3,我不知道如何使用它配置 retryDelayOptions 和 customBackoff。我在AWS自己的API参考或网上找不到任何示例代码。这就是我之前所做的:
retryDelayOptions: { customBackoff: (retryCount) => 2 ** (retryCount * 100) },
maxRetries: 2
Run Code Online (Sandbox Code Playgroud)
我将上述内容作为选项传递给客户端构造函数。V3 的重试似乎发生了很大变化,如果没有任何示例,我无法理解 API。任何帮助深表感谢
问候,迪帕克
javascript amazon-web-services exponential-backoff retry-logic aws-sdk-js-v3
这是我见过的一些示例代码.
int expBackoff = (int) Math.pow(2, retryCount);
int maxJitter = (int) Math.ceil(expBackoff*0.2);
int finalBackoff = expBackoff + random.nextInt(maxJitter);
Run Code Online (Sandbox Code Playgroud)
我想知道在这里使用随机抖动有什么好处?
我AmazonDynamoDBClient
使用以下方法创建了一个标准AmazonDynamoDBClientBuilder
:
AmazonDynamoDBClient client = AmazonDynamoDBClientBuilder.standard().build();
Run Code Online (Sandbox Code Playgroud)
在AmazonDynamoDBClient 的文档中,它提到:
ProvisionedThroughputExceededException
- 您的请求率太高。适用于 DynamoDB 的 AWS 开发工具包会自动重试收到此异常的请求。您的请求最终会成功,除非您的重试队列太大而无法完成。减少请求频率并使用指数退避。
当重试由于 原因而失败的请求时,标准客户端是否默认使用指数退避ProvisionedThroughputExceededException
?或者这是我需要手动配置的东西?
如果 HTTP 调用失败,我在非常基本的场景中使用 Polly 进行指数退避:
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
return await HandleTransientHttpError()
.Or<TimeoutException>()
.WaitAndRetryAsync(4, retryAttempt => TimeSpan.FromSeconds(Math.Pow(3, retryAttempt)))
.ExecuteAsync(async () => await base.SendAsync(request, cancellationToken).ConfigureAwait(false));
}
private static PolicyBuilder<HttpResponseMessage> HandleTransientHttpError()
{
return Policy
.HandleResult<HttpResponseMessage>(response => (int)response.StatusCode >= 500 || response.StatusCode == System.Net.HttpStatusCode.RequestTimeout)
.Or<HttpRequestException>();
}
Run Code Online (Sandbox Code Playgroud)
我有一个测试 API,它只是HttpListener
在while(true)
. 目前,我正在尝试测试客户端在每次调用收到 500 时是否正确重试。
while (true)
{
listener.Start();
Console.WriteLine("Listening...");
HttpListenerContext context = listener.GetContext();
HttpListenerRequest request = context.Request;
HttpListenerResponse response = context.Response;
response.StatusCode = (int)HttpStatusCode.InternalServerError;
//Thread.Sleep(1000 * 1); …
Run Code Online (Sandbox Code Playgroud) 我检查了调用之间的恒定间隔,发现在这个无限循环中,连续调用之间的时间不是 5 秒,而是随机变化的,尽管不到 5 秒。不明白,为什么。
from datetime import datetime
from backoff import on_predicate, constant
@on_predicate(constant, interval=5)
def fnc(i):
print('%s %d' % (datetime.now().strftime("%H:%M:%S:%f"),i), flush=True)
return i
for i in range(7):
fnc(i)
Run Code Online (Sandbox Code Playgroud)
输出:
17:48:48:348775 0
17:48:50:898752 0
17:48:52:686353 0
17:48:53:037900 0
17:48:57:264762 0
17:48:58:348803 0
Run Code Online (Sandbox Code Playgroud) 该cloud.google.com/go/pubsub
库最近发布(在 v1.5.0 中,参见https://github.com/googleapis/google-cloud-go/releases/tag/pubsub%2Fv1.5.0)支持新的RetryPolicy
服务器端功能。当前读取的文档(https://godoc.org/cloud.google.com/go/pubsub#RetryPolicy)
我读过维基百科文章,虽然它描述了离散时间的指数退避,但我没有看到这篇文章与MinimumBackoff
和MaximumBackoff
参数有什么具体关系。有关此指导下,我提到的文档github.com/cenkalti/backoff
,https://pkg.go.dev/github.com/cenkalti/backoff/v4?tab=doc#ExponentialBackOff。该库将一个定义ExponentialBackoff
为
type ExponentialBackOff struct {
InitialInterval time.Duration
RandomizationFactor float64
Multiplier float64
MaxInterval time.Duration
// After MaxElapsedTime the ExponentialBackOff returns Stop.
// It never stops if MaxElapsedTime == 0.
MaxElapsedTime time.Duration
Stop time.Duration
Clock Clock
// contains filtered or unexported fields
}
Run Code Online (Sandbox Code Playgroud)
其中每个随机区间计算为
randomized interval =
RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
Run Code Online (Sandbox Code Playgroud)
哪里RetryInterval …
go publish-subscribe google-cloud-pubsub exponential-backoff
我有一个 API,它需要Observable
触发一个事件。
我想返回一个,如果检测到互联网连接,则每秒Observable
发出一个值,如果没有连接,则延迟时间。defaultDelay
numberOfFailedAttempts^2
我尝试了很多不同的风格,我遇到的最大问题是retryWhen's
observable 只评估一次:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
Run Code Online (Sandbox Code Playgroud)
有什么办法可以做我想做的事情吗?我发现了一个相关的问题(现在无法找到它搜索),但所采取的方法似乎不适用于动态值。
我正在尝试使用 RX Java 来使用来自不断发送对象的源的一些数据。
我想知道如何为我自己的代码抛出异常的情况实施重试策略。例如,网络异常应该使用指数退避策略触发重试。
一些代码:
message.map(this::processMessage)
.subscribe((message)->{
//do something after mapping
});
Run Code Online (Sandbox Code Playgroud)
processMessage(message)
是包含可能失败的风险代码的方法,以及我想重试的代码部分,但我不想阻止 observable 消耗来自源的数据。
对此有何想法?
我正在尝试创建一个可以按以下方式工作的功能:
我已经尝试调试重试函数有一段时间了,但它不起作用,并且在第一次重试后基本上停止了(即使我将尝试次数指定为 100)。我可以做什么来确保它不断重试拉取记录?
代码如下:
// RETRY FUNCTION
func retry(attempts int, sleep time.Duration, f func() error) (err error) {
for i := 0; ; i++ {
err = f()
if err == nil {
return
}
if i >= (attempts - 1) {
break
}
time.Sleep(sleep)
sleep *= 2
log.Println("retrying after error:", err)
}
return fmt.Errorf("after %d attempts, last error: %s", attempts, err) }
//Save Data function
type Records struct { …
Run Code Online (Sandbox Code Playgroud) retry-logic ×4
go ×2
java ×2
rx-java ×2
.net ×1
algorithm ×1
apache-kafka ×1
aws-java-sdk ×1
aws-sdk ×1
c# ×1
java-8 ×1
javascript ×1
messaging ×1
observable ×1
polly ×1
python-3.x ×1
random ×1
spring-kafka ×1
spring-retry ×1