我正在尝试Rx,因为它似乎非常适合我们的领域,但学习曲线让我感到意外.
我需要将历史价格数据与实时价格数据结合在一起.
我正在尝试采用通常的方法将其用于Rx的语言:
我有这个令人作呕和不正确的稻草人代码,这似乎适用于我写的天真测试案例:
IConnectableObservable<Tick> live = liveService
.For(symbol)
.Replay(/* Some appropriate buffer size */);
live.Connect();
IObservable<Tick> historical = historyService.For(since, symbol);
return new[] {historical, live}
.Concat()
.Where(TicksAreInChronologicalOrder());
private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
// Some stateful predicate comparing the timestamp of this tick
// to the timestamp of the last tick we saw
}
Run Code Online (Sandbox Code Playgroud)
这有一些缺点
live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */).是Wait(this IObservable<TSource>) …当我这样做:
testScheduler.Schedule("Hello world",(scheduler, state) => Console.WriteLine(state));
testScheduler.AdvanceTo(testScheduler.Now);
Run Code Online (Sandbox Code Playgroud)
我点击此代码VirtualTimeSchedulerBase:
public void AdvanceTo(TAbsolute time)
{
int num = this.Comparer.Compare(time, this.Clock);
if (num < 0)
throw new ArgumentOutOfRangeException("time");
if (num == 0)
return;
Run Code Online (Sandbox Code Playgroud)
num == 0 是的,我退出方法.
我可以打电话testScheduler.Start(),我的行动将会执行.但随后TestScheduler将继续执行其队列中的所有内容.而我希望它在当前时间停止执行操作.
我在TestScheduler上看不到任何其他方法可以让我获得我想要的行为.
这是一个错误,还是正确的行为,但我错过了什么?
编辑:
我误解了.TestScheduler直到它们被安排的日期之后才执行操作.
调度操作会立即将其调度为当前值testScheduler.Now.所以直到它才会被执行Now + 1.
var testScheduler = new TestScheduler();
var due = new DateTime();
testScheduler.Schedule("Hello world", due, (scheduler, s) =>
{
Console.WriteLine(s);
return Disposable.Empty;
});
testScheduler.AdvanceTo(due.Ticks);
// Nothing …Run Code Online (Sandbox Code Playgroud)