Reactive:IObservable接收的最新值

vdh*_*ant 8 .net c# system.reactive

我知道以下是阻塞调用,并将返回可观察序列中的第一个值:

var result = myObservable.First();
Run Code Online (Sandbox Code Playgroud)

根据我使用的主题类型,这有不同的含义:

  • Subject - First()将阻塞,直到下一次调用OnNext(),这意味着最终这将是最新的值
  • BehaviorSubject - First()将阻塞,直到通过OnNext()推送至少一个值,并且因为BehaviorSubject跟踪最后一个值,这将是最新值
  • ReplaySubject - First()将阻塞,直到通过OnNext()推送了至少一个值,但是如果许多项目已经通过OnNext推送,那么它将是其缓冲区中的第一个,而不是最后一个

现在我试图找到一种获取最后一个值的一致方法,无论使用哪个底层Observable.

有任何想法吗?

Ric*_*ein 4

根据您的其他 Rx 问题,我认为您想要这个:

var rootSubject = new ReplaySubject<Types>();
var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Fourth);

var mergedSubject = Observable
              .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
        .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))    
        .Replay();

mergedSubject.Connect();

rootSubject.OnNext(Types.First);
rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

rootSubject.OnNext(Types.Third);
rootSubject.OnNext(Types.Fourth);

Console.WriteLine(String.Format("result - {0}", result));
Run Code Online (Sandbox Code Playgroud)

现在无论使用哪种Subject,它们都返回“结果 - First”。

如果您想在调用 mergedSubject.First() 之前获得最新值,您可以使用 Replay(1):

var mergedSubject = Observable
                .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
                .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))    
                .Replay(1);
Run Code Online (Sandbox Code Playgroud)

在这种情况下,所有主题类型都将返回“结果 - Second”。