如何使用Observable实现轮询?

Ale*_*rov 9 c# reactive-programming system.reactive async-await rx-java

我有一个参数化的休息调用应该每隔五秒用不同的参数执行:

Observable<TResult> restCall = api.method1(param1);
Run Code Online (Sandbox Code Playgroud)

我需要创建一个Observable<TResult>将每隔5秒轮询一次restCall ,其中包含param1的不同值.如果api调用失败,我需要收到错误并在5秒内进行下一次调用.只有在restCall完成(成功/错误)时才应测量调用之间的间隔.

我目前正在使用RxJava,但.NET示例也会很好.

Jam*_*rld 13

介绍

首先,承认,我是一个.NET人,我知道这种方法使用了一些在Java中没有直接等价的习语.但是我接受你的话,并且继续这样做,这是一个.NET家伙会喜欢的一个很好的问题,希望它会引导你走在rx-java的正确道路上,这是我从未看过的.这是一个很长的答案,但它主要是解释 - 解决方案代码本身很短!

使用Either

我们需要先排序一些工具来帮助解决这个问题.首先是使用该Either<TLeft, TRight>类型.这一点很重要,因为你每次调用的两种可能的结果无论是一个不错的结果,或错误.但是我们需要将它们包装在一个类型中 - 我们不能使用OnError来发送错误,因为这会终止结果流.要么看起来有点像元组,并且更容易处理这种情况.该RXx中库有一个非常全面和良好的执行Either,但这里是使用后跟一个简单的实现有利于我们的目的的简单通用的例子:

var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());

/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
{
    public abstract bool IsLeft { get; }
    public bool IsRight { get { return !IsLeft; } }
    public abstract TLeft Left { get; }
    public abstract TRight Right { get;  }    
}

public static class Either
{
    public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TLeft _leftValue;

        public LeftValue(TLeft leftValue)
        {
            _leftValue = leftValue;
        }

        public override TLeft Left { get { return _leftValue; } }
        public override TRight Right { get { return default(TRight); } }
        public override bool IsLeft { get { return true; } }
    }

    public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TRight _rightValue;

        public RightValue(TRight rightValue)
        {
            _rightValue = rightValue;
        }

        public override TLeft Left { get { return default(TLeft); } }
        public override TRight Right { get { return _rightValue; } }
        public override bool IsLeft { get { return false; } }
    }

    // Factory functions to create left or right-valued Either instances
    public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
    {
        return new LeftValue<TLeft, TRight>(leftValue);
    }

    public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
    {
        return new RightValue<TLeft, TRight>(rightValue);
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,按照惯例,当使用Either建模成功或失败时,右侧用于成功的值,因为它当然是"正确的":)

一些助手功能

我将使用一些辅助函数来模拟问题的两个方面.首先,这是一个生成参数的工厂 - 每次调用它时,它将返回以1开头的整数序列中的下一个整数:

// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
{
    return ++count; 
}
Run Code Online (Sandbox Code Playgroud)

接下来,这是一个将Rest调用模拟为IObservable的函数.此函数接受整数并且:

  • 如果整数是偶数,则返回一个立即发送OnError的Observable.
  • 如果整数是奇数,则返回一个字符串,将整数与"-ret"连接起来,但仅在经过一秒后才返回.我们将使用它来检查轮询间隔是否按照您的请求运行 - 作为完成调用之间的暂停,无论它们花费多长时间,而不是常规间隔.

这里是:

// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
{
    return x % 2 == 0
        ? Observable.Throw<string>(new Exception())
        : Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));   
}
Run Code Online (Sandbox Code Playgroud)

现在好位

Below is a reasonably generic reusable function I have called Poll. It accepts an asynchronous function that will be polled, a parameter factory for that function, the desired rest (no pun intended!) interval, and finally an IScheduler to use.

我能想到的最简单的方法是使用Observable.Create它来使用调度程序来驱动结果流.ScheduleAsync是一种使用.NET async/await表单的调度方式.这是一个.NET习惯用法,允许您以强制方式编写异步代码.该async关键字引入了一个异步函数,该函数可以await在其正文中进行一次或多次异步调用,并且仅在调用完成时才会继续.我在这个问题中写了一个关于这种调度风格的长解释,其中包括较旧的递归风格,这种风格可能更容易在rx-java方法中实现.代码如下所示:

public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
    Func<TArg, IObservable<TResult>> asyncFunction,
    Func<TArg> parameterFactory,
    TimeSpan interval,
    IScheduler scheduler)
{
    return Observable.Create<Either<Exception, TResult>>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) => {
            while(!ct.IsCancellationRequested)
            {
                try
                {
                    var result = await asyncFunction(parameterFactory());
                    observer.OnNext(Either.Right<Exception,TResult>(result));
                }
                catch(Exception ex)
                {
                    observer.OnNext(Either.Left<Exception, TResult>(ex));
                }
                await ctrl.Sleep(interval, ct);
            }
        });        
    });    
}
Run Code Online (Sandbox Code Playgroud)

打破这一点,Observable.Create通常是一个用于创建IObservable的工厂,它可以让您对结果发布给观察者的方式进行大量控制.它经常被忽视,支持不必要的复杂原始组合.

在这种情况下,我们使用它来创建一个流,Either<TResult, Exception>以便我们可以返回成功和失败的轮询结果.

Create函数接受一个观察者,该观察者代表我们通过OnNext/OnError/OnCompleted将结果传递给的订阅者.我们需要IDisposableCreate调用中返回一个- 在.NET中,这是订阅者可以取消订阅的句柄.这在这里特别重要,因为轮询将永远继续 - 或者至少它永远不会OnComplete.

ScheduleAsync(或简单Schedule)的结果就是这样一个句柄.处置后,它将取消我们预定的任何未决事件 - 从而结束轮询循环.在我们的例子中,Sleep我们使用管理间隔是撤销的操作,尽管该查询功能可以很容易地修改以接受撤销asyncFunction接受一个CancellationToken为好.

ScheduleAsync方法接受将调用以调度事件的函数.它传递了两个参数,第一个ctrl是调度程序本身.第二个ct是CancellationToken我们可以用来查看是否已经请求取消(由订阅者处理他们的订阅句柄).

轮询本身通过无限循环执行,该循环仅在CancellationToken指示已请求取消时终止.

在循环中,我们可以使用async/await的魔力来异步调用轮询函数,但仍然将它包装在异常处理程序中.这太棒了!假设没有错误,我们将结果作为a的正确值发送Either给观察者via OnNext.如果有异常,我们将其作为a的值发送Either给观察者.最后,我们使用Sleep调度程序上的函数在休息间隔之后调度唤醒调用 - 不要与Thread.Sleep调用混淆,这通常不会阻塞任何线程.请注意,Sleep接受CancellationToken也将被中止的启用!

我想你会同意这是一个很酷的使用async/await来简化本来一个非常棘手的问题!

示例用法

最后,这里有一些测试代码Poll和示例输出 - 对于LINQPad风扇,本答案中的所有代码将在LINQPad中运行,并引用Rx 2.1程序集:

void Main()
{
    var subscription = Poll(SomeRestCall,
                            ParameterFactory,
                            TimeSpan.FromSeconds(5),
                            ThreadPoolScheduler.Instance)
        .TimeInterval()                            
        .Subscribe(x => {
            Console.Write("Interval: " + x.Interval);
            var result = x.Value;
            if(result.IsRight)
                Console.WriteLine(" Success: " + result.Right);
            else
                Console.WriteLine(" Error: " + result.Left.Message);
        });

    Console.ReadLine();    
    subscription.Dispose();
}

Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.
Run Code Online (Sandbox Code Playgroud)

请注意,如果立即返回错误,则结果之间的间隔为5秒(轮询间隔),或者成功结果为6秒(轮询间隔加模拟REST调用持续时间).

编辑 - 这是一个替代实现,使用ScheduleAsync,但使用旧式递归调度,没有async/await语法.正如您所看到的,它更加混乱 - 但它也支持取消asyncFunction observable.

    public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
        Func<TArg, IObservable<TResult>> asyncFunction,
        Func<TArg> parameterFactory,
        TimeSpan interval,
        IScheduler scheduler)
    {
        return Observable.Create<Either<Exception, TResult>>(
            observer =>
                {
                    var disposable = new CompositeDisposable();
                    var funcDisposable = new SerialDisposable();
                    bool cancelRequested = false;
                    disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
                    disposable.Add(funcDisposable);
                    disposable.Add(scheduler.Schedule(interval, self =>
                        {
                            funcDisposable.Disposable = asyncFunction(parameterFactory())
                                .Finally(() =>
                                    {
                                        if (!cancelRequested) self(interval);
                                    })
                                .Subscribe(
                                    res => observer.OnNext(Either.Right<Exception, TResult>(res)),
                                    ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));
                        }));

                    return disposable;

                });
    }
Run Code Online (Sandbox Code Playgroud)

请参阅我的其他答案,了解避免.NET 4.5异步/等待功能的不同方法,并且不使用Schedule调用.

我希望这对rx-java家伙有所帮助!