订阅者缺少消息; 这是Rx的错误还是我做错了?

fah*_*ash 1 c# wpf reactive-programming system.reactive

我有一个Window WPF应用程序与以下构造函数

        numbers = Observable.Generate(DateTime.Now,
                                         time => true,
                                         time => DateTime.Now,
                                         time => { return new     Random(DateTime.Now.Millisecond).NextDouble() * 99 + 2; },
                                         time => TimeSpan.FromSeconds(1.0));


        numbers.ObserveOnDispatcher()
            .Subscribe(s => list1.Items.Add(s.ToString("##.00")));

        numbers.Where(n => n < 10).ObserveOnDispatcher().
            Subscribe(s => list2.Items.Add(s.ToString("##.00")));
Run Code Online (Sandbox Code Playgroud)

现在这里是列表的屏幕截图 - 左侧列表中缺少通知3.76 ...此行为是间歇性的.

图片

Eni*_*ity 7

简短的回答是你做错了.Rx工作得很好.

创建observable时,您将创建一系列值随时间的定义,而不是随时间推移的实际值序列.这意味着每当您拥有可观察对象的订阅者时,您就会为每个订阅者创建可观察对象的新实例.

因此,在您的情况下,您有两个此序列运行的实例:

var numbers =
    Observable
        .Generate(
            DateTime.Now,
            time => true,
            time => DateTime.Now,
            time => new Random(DateTime.Now.Millisecond)
                .NextDouble() * 99 + 2,
            time => TimeSpan.FromSeconds(1.0));
Run Code Online (Sandbox Code Playgroud)

现在,由于您立即连续两次订阅此可观察对象,因此该可观察对象的两个实例将尝试在几乎同时生成值.因此,DateTime.Now.Millisecond大多数时候价值都是一样的,但现在总是如此.返回的值new Random(x).NextDouble()对于相同的值是相同的x,因此大多数时候为什么从observable的两个实例中获得相同的值.只有在DateTime.Now.Millisecond不同的情况下,您才能获得两个不同的值,而且订阅者似乎缺少值.


这是一个替代版本,应该按照您最初的预期工作:

var rnd = new Random((int)DateTime.Now.Ticks);

var numbers =
    Observable
        .Generate(0, n => true, n => 0,
            n => rnd.NextDouble() * 99 + 2,
            n => TimeSpan.FromSeconds(1.0));

var publishedNumbers = numbers.Publish();

publishedNumbers
    .ObserveOnDispatcher()
    .Subscribe(s => list1.Items.Add(s.ToString("##.00")));

publishedNumbers
    .Where(n => n < 10)
    .ObserveOnDispatcher()
    .Subscribe(s => list2.Items.Add(s.ToString("##.00")));

publishedNumbers.Connect();
Run Code Online (Sandbox Code Playgroud)

  • @fahadash - 将其更改为秒并不能解决问题,它只会降低1000倍的可能性.然后你就不能正确地产生随机数.你需要重构observable并使用`Publish`来共享observable. (3认同)
  • ..我认为数字序列可以进一步简化为`Observable.Interval(TimeSpan.FromSeconds(1.0)).选择(_ => rnd.NextDouble()*99 + 2)`.我认为这更具表现力,即每秒,从这个序列中投射一个随机数. (2认同)