Sup*_*JMN 10 .net c# reactive-programming system.reactive
我必须及时查询数据库以了解遗留系统的状态.我想过围绕一个查询包装Observable,但我不知道正确的方法.
基本上,它将是每5秒相同的查询.但我担心我将不得不面对这些问题:
额外细节:
SELECT返回带有状态代码列表的数据集(工作,出错).我几乎可以确定查询应该在另一个线程中执行,但是我不知道observable应该是什么样子的,看过Lee Campbell的Rx简介.
Lee*_*ell 17
这是使用Rx轮询另一个系统的相当经典的案例.大多数人都会使用它Observable.Interval作为他们的首选操作员,对大多数人来说,这样会很好.
但是,您对超时和重试有特定要求.在这种情况下,我认为你最好使用运算符的组合:
Observable.Timer 允许您在指定时间内执行查询Timeout 识别和数据库查询已超限ToObservable()将Task结果映射到可观察的序列.Retry 允许你在超时后恢复Repeat允许您在成功进行数据库查询后继续.这也将保持先前数据库查询完成和下一个数据库查询开始之间的初始时间/间隔.这个工作LINQPad代码段应该显示查询正常工作:
void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);
//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}
// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}
Run Code Online (Sandbox Code Playgroud)
结果如下:
Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606
Run Code Online (Sandbox Code Playgroud)
样本的关键部分是....
var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
Run Code Online (Sandbox Code Playgroud)
编辑: 这是如何达到这个解决方案的进一步解释.https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md