小编Mat*_*age的帖子

将历史和实时股票价格数据与Rx合并

我正在尝试Rx,因为它似乎非常适合我们的领域,但学习曲线让我感到意外.

我需要将历史价格数据与实时价格数据结合在一起.

我正在尝试采用通常的方法将其用于Rx的语言:

  1. 立即订阅实时价格并开始缓冲我收到的价值
  2. 发起历史价格数据请求(这需要在订购实时价格后发生,因此我们的数据没有任何差距)
  3. 在他们回来时发布历史价格
  4. 一旦我们收到所有历史数据,发布缓冲的实时数据,删除任何与我们历史数据重叠的值
  5. 继续从实时价格Feed中重播数据

我有这个令人作呕和不正确的稻草人代码,这似乎适用于我写的天真测试案例:

IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
live.Connect();

IObservable<Tick> historical = historyService.For(since, symbol);

return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw
}
Run Code Online (Sandbox Code Playgroud)

这有一些缺点

  1. 适当的重放缓冲区大小未知.无法设置无限缓冲区 - 这是一个长时间运行的序列.我们真的想要在第一次调用Subscribe时刷新某种一次性缓冲区.如果在Rx中存在,我找不到它.
  2. 即使我们切换到发布实时价格,重播缓冲区仍将继续存在.此时我们不需要缓冲区.
  3. 同样,一旦我们跳过历史价格和实时价格之间的初始重叠,就不需要过滤掉重叠价格的谓词.我真的想做一些事情:live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */).是Wait(this IObservable<TSource>) …

system.reactive

11
推荐指数
1
解决办法
1616
查看次数

Rx .Net TestScheduler-执行立即安排的事件

当我这样做:

testScheduler.Schedule("Hello world",(scheduler, state) => Console.WriteLine(state));
testScheduler.AdvanceTo(testScheduler.Now);
Run Code Online (Sandbox Code Playgroud)

我点击此代码VirtualTimeSchedulerBase:

public void AdvanceTo(TAbsolute time)
{
  int num = this.Comparer.Compare(time, this.Clock);
  if (num < 0)
    throw new ArgumentOutOfRangeException("time");
  if (num == 0)
    return;
Run Code Online (Sandbox Code Playgroud)

num == 0 是的,我退出方法.

我可以打电话testScheduler.Start(),我的行动将会执行.但随后TestScheduler将继续执行其队列中的所有内容.而我希望它在当前时间停止执行操作.

我在TestScheduler上看不到任何其他方法可以让我获得我想要的行为.

这是一个错误,还是正确的行为,但我错过了什么?

编辑:

我误解了.TestScheduler直到它们被安排的日期之后才执行操作.

调度操作会立即将其调度为当前值testScheduler.Now.所以直到它才会被执行Now + 1.

  var testScheduler = new TestScheduler();
  var due = new DateTime();
    testScheduler.Schedule("Hello world", due, (scheduler, s) =>
  {
    Console.WriteLine(s);
    return Disposable.Empty;
  });
  testScheduler.AdvanceTo(due.Ticks);
  // Nothing …
Run Code Online (Sandbox Code Playgroud)

.net c# system.reactive

2
推荐指数
1
解决办法
1542
查看次数

标签 统计

system.reactive ×2

.net ×1

c# ×1