使用Rx为webservice调用创建轮询请求

Bal*_*ngh 2 c# recursion polling reactive-programming system.reactive

在C#中使用Rx我正在尝试创建REST API的轮询请求.我面临的问题是,Observable需要按顺序发送回复.意味着如果请求A在X时间进行并且请求B在X + dx时间进行并且B的响应在A之前出现,则Observable表达式应该忽略或取消请求A.

我编写了一个示例代码,试图描述该场景.我如何解决它只获得最新的响应,取消或忽略以前的响应.

 class Program
    {
        static int i = 0;

        static void Main(string[] args)
        {
            GenerateObservableSequence();

            Console.ReadLine();
        }

        private static void GenerateObservableSequence()
        {
            var timerData = Observable.Timer(TimeSpan.Zero,
                TimeSpan.FromSeconds(1));

            var asyncCall = Observable.FromAsync<int>(() =>
            {
                TaskCompletionSource<int> t = new TaskCompletionSource<int>();
                i++;

                int k = i;
                var rndNo = new Random().Next(3, 10);
                Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
                return t.Task;
            });

            var obs = from t in timerData
            from data in asyncCall
            select data;

            var hot = obs.Publish();
            hot.Connect();

                hot.Subscribe(j => 
            {
                Console.WriteLine("{0}", j);
            });
        }
    }
Run Code Online (Sandbox Code Playgroud)

在@Enigmativity回答之后:添加轮询Aync功能以始终采取最新响应:

 public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
        {
            return Observable
         .Create<T>(o =>
         {
             var z = 0L;
             return
                 Observable
                     .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
                     .SelectMany(nr =>
                         Observable.FromAsync<T>(AsyncCall),
                         (nr, obj) => new { nr, obj})
                     .Do(res => z = Math.Max(z, res.nr))
                     .Where(res => res.nr >= z)
                     .Select(res => res.obj)
                     .Subscribe(o);
         });

    }
Run Code Online (Sandbox Code Playgroud)

Lee*_*ell 5

这是一种常见的情况,可以简单地修复.

您的示例代码的关键部分是

var obs = from t in timerData
          from data in asyncCall
          select data;
Run Code Online (Sandbox Code Playgroud)

这可以理解为" timerData获取所有值中的每个值asyncCall".这是SelectMany(或FlatMap)运算符.该SelectMany运营商将采取从内部序列(所有值asyncCall),当他们收到返回的值.这意味着您可以获得无序值.

你想要的是当外部序列(timerData)产生一个新值时取消先前的内部序列.为此,我们希望使用Switch运算符.

var obs = timerData.Select(_=>asyncCall)
                   .Switch();
Run Code Online (Sandbox Code Playgroud)

完整代码可以清理到以下.(删除冗余发布/连接,按键处理订阅)

class Program {static int i = 0;

    static void Main(string[] args)
    {
        using (GenerateObservableSequence().Subscribe(x => Console.WriteLine(x)))
        {
            Console.ReadLine();
        }
    }

    private static IObservable<int> GenerateObservableSequence()
    {
        var timerData = Observable.Timer(TimeSpan.Zero,
            TimeSpan.FromSeconds(1));

        var asyncCall = Observable.FromAsync<int>(() =>
        {
            TaskCompletionSource<int> t = new TaskCompletionSource<int>();
            i++;

            int k = i;
            var rndNo = new Random().Next(3, 10);
            Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
            return t.Task;
        });

        return from t in timerData
               from data in asyncCall
               select data;
    }
}
Run Code Online (Sandbox Code Playgroud)

- 编辑 -

看起来我误解了这个问题.@Enigmativity提供了更准确的答案.这是他的答案的清理.

//Probably should be a field?
var rnd = new Random();
var obs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
        //.Select(n => new { n, r = ++i })
        //No need for the `i` counter. Rx does this for us with this overload of `Select`
        .Select((val, idx) => new { Value = val, Index = idx})
        .SelectMany(nr =>
            Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
            (nr, _) => nr)
        //.Do(nr => z = Math.Max(z, nr.n))
        //.Where(nr => nr.n >= z)
        //Replace external State and Do with scan and Distinct
        .Scan(new { Value = 0L, Index = -1 }, (prev, cur) => {
            return cur.Index > prev.Index
                ? cur
                : prev;
        })
        .DistinctUntilChanged()
        .Select(nr => nr.Value)
        .Dump();
Run Code Online (Sandbox Code Playgroud)