Bal*_*ngh 2 c# recursion polling reactive-programming system.reactive
在C#中使用Rx我正在尝试创建REST API的轮询请求.我面临的问题是,Observable需要按顺序发送回复.意味着如果请求A在X时间进行并且请求B在X + dx时间进行并且B的响应在A之前出现,则Observable表达式应该忽略或取消请求A.
我编写了一个示例代码,试图描述该场景.我如何解决它只获得最新的响应,取消或忽略以前的响应.
class Program
{
static int i = 0;
static void Main(string[] args)
{
GenerateObservableSequence();
Console.ReadLine();
}
private static void GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
var obs = from t in timerData
from data in asyncCall
select data;
var hot = obs.Publish();
hot.Connect();
hot.Subscribe(j =>
{
Console.WriteLine("{0}", j);
});
}
}
Run Code Online (Sandbox Code Playgroud)
在@Enigmativity回答之后:添加轮询Aync功能以始终采取最新响应:
public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
{
return Observable
.Create<T>(o =>
{
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
.SelectMany(nr =>
Observable.FromAsync<T>(AsyncCall),
(nr, obj) => new { nr, obj})
.Do(res => z = Math.Max(z, res.nr))
.Where(res => res.nr >= z)
.Select(res => res.obj)
.Subscribe(o);
});
}
Run Code Online (Sandbox Code Playgroud)
这是一种常见的情况,可以简单地修复.
您的示例代码的关键部分是
var obs = from t in timerData
from data in asyncCall
select data;
Run Code Online (Sandbox Code Playgroud)
这可以理解为" timerData获取所有值中的每个值asyncCall".这是SelectMany(或FlatMap)运算符.该SelectMany运营商将采取从内部序列(所有值asyncCall),当他们收到返回的值.这意味着您可以获得无序值.
你想要的是当外部序列(timerData)产生一个新值时取消先前的内部序列.为此,我们希望使用Switch运算符.
var obs = timerData.Select(_=>asyncCall)
.Switch();
Run Code Online (Sandbox Code Playgroud)
完整代码可以清理到以下.(删除冗余发布/连接,按键处理订阅)
class Program {static int i = 0;
static void Main(string[] args)
{
using (GenerateObservableSequence().Subscribe(x => Console.WriteLine(x)))
{
Console.ReadLine();
}
}
private static IObservable<int> GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
return from t in timerData
from data in asyncCall
select data;
}
}
Run Code Online (Sandbox Code Playgroud)
- 编辑 -
看起来我误解了这个问题.@Enigmativity提供了更准确的答案.这是他的答案的清理.
//Probably should be a field?
var rnd = new Random();
var obs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
//.Select(n => new { n, r = ++i })
//No need for the `i` counter. Rx does this for us with this overload of `Select`
.Select((val, idx) => new { Value = val, Index = idx})
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
(nr, _) => nr)
//.Do(nr => z = Math.Max(z, nr.n))
//.Where(nr => nr.n >= z)
//Replace external State and Do with scan and Distinct
.Scan(new { Value = 0L, Index = -1 }, (prev, cur) => {
return cur.Index > prev.Index
? cur
: prev;
})
.DistinctUntilChanged()
.Select(nr => nr.Value)
.Dump();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
464 次 |
| 最近记录: |