如何使用响应式扩展 (Rx.Net) 等待某个值或直到经过固定时间

Awk*_*der 5 c# system.reactive

我想等待(阻塞)一个线程,直到经过一段时间或另一个流泵送一个值,我认为以下内容可能会实现此目的,但它会抛出异常,因为第一个流是空的,

 // class level subject manipulated by another thread...
 _updates = new Subject<Unit>();
 ...
 // wait for up to 5 seconds before carrying on...    
 var result = Observable.Timer(DateTime.Now.AddSeconds(5))
    .TakeUntil(_updates)
    .Wait();
Run Code Online (Sandbox Code Playgroud)

如何实现阻塞长达 5 秒或直到其他流泵出值的能力?

Evk*_*Evk 4

Observable.Timeout你可以这样使用:

 var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)).Wait();
Run Code Online (Sandbox Code Playgroud)

我使用Take(1)超时是因为超时期望序列完成,而不仅仅是产生下一个值。超时时会抛出System.TimeoutException.

如果您不想要异常 - 您可以使用Catch来提供一些值:

var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5))
    .Catch(Observable.Return(default(Unit))).Wait();
// should catch specific exception, not all
Run Code Online (Sandbox Code Playgroud)

如果您Unit确实是 @Shlomo 提到的 rx 单元 - 您可以像这样更改它:

var result = _updates.Select(c => (Unit?) c).Take(1)
    .Timeout(DateTime.Now.AddSeconds(5)).Catch(Observable.Return((Unit?) null)).Wait();
Run Code Online (Sandbox Code Playgroud)

或者像往常一样捕获该异常。