写一个Rx"RetryAfter"扩展方法

nve*_*aud 22 c# system.reactive

IntroToRx书中,作者建议为I/O编写一个"智能"重试,在一段时间后重试I/O请求,如网络请求.

这是确切的段落:

添加到您自己的库的有用扩展方法可能是"Back Off and Retry"方法.我与之合作过的团队在执行I/O时发现了这样一个特性,尤其是网络请求.这个概念是尝试,并在失败时等待一段时间,然后再试一次.您的此方法版本可能会考虑您要重试的异常类型,以及重试的最大次数.您甚至可能希望延长等待时间,以便在每次后续重试时不那么激进.

不幸的是,我无法弄清楚如何编写这种方法.:(

Mar*_*son 37

这种实现后退重试的关键是延迟可观察性.延迟的observable将不会执行其工厂,直到有人订阅它.它将为每个订阅调用工厂,使其成为我们重试方案的理想选择.

假设我们有一个触发网络请求的方法.

public IObservable<WebResponse> SomeApiMethod() { ... }
Run Code Online (Sandbox Code Playgroud)

出于这个小片段的目的,让我们将延迟定义为 source

var source = Observable.Defer(() => SomeApiMethod());
Run Code Online (Sandbox Code Playgroud)

每当有人订阅源时,它将调用SomeApiMethod并启动新的Web请求.每当它失败时重试它的天真方法是使用内置的重试操作符.

source.Retry(4)
Run Code Online (Sandbox Code Playgroud)

这虽然对API来说不是很好,但它不是你所要求的.我们需要在每次尝试之间延迟发出请求.一种方法是延迟订阅.

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)
Run Code Online (Sandbox Code Playgroud)

这是不理想的,因为它甚至会在第一次请求时添加延迟,让我们解决这个问题.

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)
Run Code Online (Sandbox Code Playgroud)

暂停一秒钟并不是一个非常好的重试方法,所以让我们将该常量更改为一个接收重试计数并返回适当延迟的函数.指数退避很容易实现.

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
Run Code Online (Sandbox Code Playgroud)

我们现在差不多完成了,我们只需要添加一种指定我们应该重试的异常的方法.让我们添加一个函数,给定异常返回是否有意义重试,我们称之为retryOnError.

现在我们需要编写一些可怕的代码,但请耐心等待.

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))
Run Code Online (Sandbox Code Playgroud)

所有这些尖括号都可以编组一个我们不应该重试的异常.Retry().IObservable<Tuple<bool, WebResponse, Exception>>如果我们有响应或异常,我们已经将内部可观察量设置为第一个bool指示的位置.如果retryOnError指示我们应该重试特定异常,则内部observable将抛出,并且将通过重试来获取.SelectMany只是解开我们的元组并使得最终的观察结果IObservable<WebRequest>再次出现.

请参阅我的要点,获取完整的源代码并测试最终版本.有了这个运算符,我们可以非常简洁地编写我们的重试代码

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )
Run Code Online (Sandbox Code Playgroud)

  • 当使用指数退避(或基本上任何基于“尝试”的退避策略)时,退避时间永远不会重置可能是出乎意料的。我的意思是 `attempt` 会随着可观察对象产生的 **每个** 错误而增加,无论两者之间有多少 *good* 值。 (2认同)

Dan*_*ith 11

也许我过度简化了这种情况,但是如果我们看一下Retry的实现,它只是一个Observable.Catch,它包含了无数可观数的observable:

private static IEnumerable<T> RepeatInfinite<T>(T value)
{
    while (true)
        yield return value;
}

public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
{
    return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source));
}
Run Code Online (Sandbox Code Playgroud)

因此,如果我们采用这种方法,我们可以在第一次收益后添加延迟.

private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime)
{
    // Don't delay the first time        
    yield return source;

    while (true)
        yield return source.DelaySubscription(dueTime);
    }

public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
    return RepeateInfinite(source, dueTime).Catch();
}
Run Code Online (Sandbox Code Playgroud)

通过重试计数捕获特定异常的重载可以更简洁:

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception
{
    return source.Catch<TSource, TException>(exception =>
    {
        if (count <= 0)
        {
            return Observable.Throw<TSource>(exception);
        }

        return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count);
    });
}
Run Code Online (Sandbox Code Playgroud)

请注意,此处的重载是使用递归.在第一次出现时,如果count类似于Int32.MaxValue,似乎可能出现StackOverflowException.但是,DelaySubscription使用调度程序来运行订阅操作,因此无法进行堆栈溢出(即使用"trampolining").我想通过查看代码并不是很明显.我们可以通过将DelaySubscription重载中的调度程序显式设置为Scheduler.Immediate,并传入TimeSpan.Zero和Int32.MaxValue来强制堆栈溢出.我们可以传入一个非立即调度程序来更明确地表达我们的意图,例如:

return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count);
Run Code Online (Sandbox Code Playgroud)

更新:添加重载以接收特定的调度程序.

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
    this IObservable<TSource> source,
    TimeSpan retryDelay,
    int retryCount,
    IScheduler scheduler) where TException : Exception
{
    return source.Catch<TSource, TException>(
        ex =>
        {
            if (retryCount <= 0)
            {
                return Observable.Throw<TSource>(ex);
            }

            return
                source.DelaySubscription(retryDelay, scheduler)
                    .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler);
        });
} 
Run Code Online (Sandbox Code Playgroud)