Rx后退并重试

sac*_*cha 9 c# reactive-programming system.reactive

这基于此SO中提供的代码:编写Rx"RetryAfter"扩展方法

我正在使用Markus Olsson的代码(目前仅进行评估),之前有人要求我试图在Github上抓住Markus,但是在我工作的地方被阻止了,所以我觉得我唯一能做的就是问在这里.很抱歉,如果这与任何人都很糟糕.

所以我在一个小的演示中使用以下代码:

class Attempt1
{
    private static bool shouldThrow = true;

    static void Main(string[] args)
    {
        Generate().RetryWithBackoffStrategy(3,
               MyRxExtensions.ExponentialBackoff,
            ex =>
            {
                return ex is NullReferenceException;
            }, Scheduler.TaskPool)
            .Subscribe(
                OnNext,
                OnError
            );

        Console.ReadLine();
    }

    private static void OnNext(int val)
    {
        Console.WriteLine("subscriber value is {0} which was seen on threadId:{1}",
            val, Thread.CurrentThread.ManagedThreadId);
    }

    private static void OnError(Exception ex)
    {
        Console.WriteLine("subscriber bad {0}, which was seen on threadId:{1}",
            ex.GetType(),
            Thread.CurrentThread.ManagedThreadId);
    }

    static IObservable<int> Generate()
    {
        return Observable.Create<int>(
            o =>
            {
                Scheduler.TaskPool.Schedule(() =>
                {
                    if (shouldThrow)
                    {
                        shouldThrow = false;
                        Console.WriteLine("ON ERROR NullReferenceException");
                        o.OnError(new NullReferenceException("Throwing"));
                    }
                    Console.WriteLine("Invoked on threadId:{0}",
                        Thread.CurrentThread.ManagedThreadId);

                    Console.WriteLine("On nexting 1");
                    o.OnNext(1);
                    Console.WriteLine("On nexting 2");
                    o.OnNext(2);
                    Console.WriteLine("On nexting 3");
                    o.OnNext(3);
                    o.OnCompleted();
                    Console.WriteLine("On complete");
                    Console.WriteLine("Finished on threadId:{0}",
                        Thread.CurrentThread.ManagedThreadId);

                });

                return () => { };
            });
    }
}

public static class MyRxExtensions
{
    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 9, 16...
    /// </summary>
    public static readonly Func<int, TimeSpan>
        ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? MyRxExtensions.ExponentialBackoff;

        int attempt = 0;

        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.Delay(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e =>
                    retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }
}
Run Code Online (Sandbox Code Playgroud)

代码对我来说很有意义,如果失败我们尝试做一个操作,我们退回并重试.我们可以指定我们想要重试的异常类型,并且我们还看到订阅者在重试之后只能看到最终值一次(在Exception上面的演示代码中只完成了(第一次出现OnError)).

所以通常代码按预期工作,除了一件事.

如果我查看输出上面的代码产生我得到这个:

ON ERROR NullReferenceException 
Invoked on threadId:10 
On nexting 1
Invoked on threadId:11 
On nexting 1 
On nexting 2 
On nexting 3 
On complete 
Finished on threadId:10 
On nexting 2 
On nexting 3 
On complete 
Finished on threadId:11 
subscriber value is 1 which was seen on threadId:10 
subscriber value is 2 which was seen on threadId:10
subscriber value is 3 which was seen on threadId:10
Run Code Online (Sandbox Code Playgroud)

对我来说有趣的是,订阅者值都是一次性的,我希望当Generate()方法中的OnNext被调用时,Subscribers OnNext会将OnNext的值写入Console输出.

任何人都可以解释为什么会这样吗?

Jam*_*rld 10

这是因为你Delay在结果流上放了一个.(ExponentialBackoff在第二次迭代中传递给的n的值为1,延迟为1秒.)

Delay在源上运行,但源正常进行. Delay安排收到的结果在指定的持续时间后发出.因此订阅者在Generate的逻辑运行完成后获得结果.

如果你考虑一下这是Delay必须的 - 否则Delay将能够以某种方式干扰上游运营商!

作为一个缓慢的消费者,可能会干扰上游运营商(不会抛出异常).但对于一个简单Delay的行为来说,这肯定是一个非常糟糕的方式.

我认为这不是Delay你想要的 - 因为Delay不会延迟它的订阅.如果你使用它DelaySubscription,你会得到你想到的东西.这也是链接问题中使用的内容.

你的问题提供之间的差异有很大的插图DelayDelaySubscription!Defer这里也值得考虑.

这三者之间的区别是微妙但重要的,所以让我们总结三者:

  • Delay- 立即调用目标操作员以立即获得IObservable其在目标上的Subscribe调用Subscribe,在指定的指定延迟之后调度事件以进行传递Scheduler.

  • DelaySubscription- 立即呼叫目标操作员以获得IObservable.在指定的指定延迟之后执行目标的Subscribe计划.SubscribeScheduler

  • Defer - 没有目标运营商.在Subscribe运行时提供工厂功能来获取目标IObservable并立即调用Subscribe.没有延迟添加,因此没有Scheduler指定.


sac*_*cha 8

对于发布在这篇文章上的其他任何人来说,确实是由詹姆斯世界和布兰登提出的建议(感谢chaps).

这是完整的工作代码

class Attempt1
{
    private static bool shouldThrow = true;

    static void Main(string[] args)
    {
        Generate().RetryWithBackoffStrategy(3, MyRxExtensions.ExponentialBackoff,
            ex =>
            {
                return ex is NullReferenceException;
            }, Scheduler.TaskPool)
            .Subscribe(
                OnNext,
                OnError
            );

        Console.ReadLine();
    }

    private static void OnNext(int val)
    {
        Console.WriteLine("subscriber value is {0} which was seen on threadId:{1}", 
            val, Thread.CurrentThread.ManagedThreadId);
    }

    private static void OnError(Exception ex)
    {
        Console.WriteLine("subscriber bad {0}, which was seen on threadId:{1}", 
            ex.GetType(),
            Thread.CurrentThread.ManagedThreadId);
    }

    static IObservable<int> Generate()
    {
        return Observable.Create<int>(
            o =>
            {
                Scheduler.TaskPool.Schedule(() =>
                    {
                        if (shouldThrow)
                        {
                            shouldThrow = false;
                            Console.WriteLine("ON ERROR NullReferenceException");
                            o.OnError(new NullReferenceException("Throwing"));
                        }
                        Console.WriteLine("Invoked on threadId:{0}", 
                            Thread.CurrentThread.ManagedThreadId);

                        Console.WriteLine("On nexting 1");
                        o.OnNext(1);
                        Console.WriteLine("On nexting 2");
                        o.OnNext(2);
                        Console.WriteLine("On nexting 3");
                        o.OnNext(3);
                        o.OnCompleted();
                        Console.WriteLine("On complete");
                        Console.WriteLine("Finished on threadId:{0}", 
                            Thread.CurrentThread.ManagedThreadId);
                    });

                return () => { };
            });
    }
}

public static class MyRxExtensions
{
    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 9, 16...
    /// </summary>
    public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? MyRxExtensions.ExponentialBackoff;

        int attempt = 0;

        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e =>
                    retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }

    public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, 
        TimeSpan delay, IScheduler scheduler = null)
    {
        if (scheduler == null)
        {
            return Observable.Timer(delay).SelectMany(_ => source);
        }
        return Observable.Timer(delay, scheduler).SelectMany(_ => source);
    }
}
Run Code Online (Sandbox Code Playgroud)

这产生了所需的输出

ON ERROR NullReferenceException
Invoked on threadId:11
On nexting 1
On nexting 2
On nexting 3
On complete
Finished on threadId:11
Invoked on threadId:11
On nexting 1
subscriber value is 1 which was seen on threadId:11
On nexting 2
subscriber value is 2 which was seen on threadId:11
On nexting 3
subscriber value is 3 which was seen on threadId:11
On complete
Finished on threadId:11
Run Code Online (Sandbox Code Playgroud)