Ron*_*erg 3 .net c# system.reactive
我有一个IObservable<byte[]>我转换成IObservable<XDocument>使用一些中间步骤:
var observedXDocuments =
from b in observedBytes
// Lot of intermediate steps to transform byte arrays into XDocuments
select xDoc;
Run Code Online (Sandbox Code Playgroud)
在某个时间点,我对观察到的XDocuments 感兴趣所以我订阅了IObserver<XDocument>.在稍后的时间点,我想订阅另一个IObserver<XDocument>并处理旧的.
如何在一次原子操作中完成此操作,而不会丢失任何观察到的XDocument?我可以这样做:
oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);
Run Code Online (Sandbox Code Playgroud)
我很担心,在这两个电话之间,我可以放松一下XDocument.如果我切换两个电话,我可能会收到XDocument两次相同的电话.
我可能会添加一层间接.编写一个名为ExchangeableObserver的类,将其订阅到您的observable,并保持永久订阅.ExchangeableObserver的工作是将所有内容委托给给定的子观察者.但是程序员可以随时更改被委派给的子观察者.在我的例子中,我有一个Exchange()方法.就像是:
public class ExchangeableObserver<T> : IObserver<T> {
private IObserver<T> inner;
public ExchangeableObserver(IObserver<T> inner) {
this.inner=inner;
}
public IObserver<T> Exchange(IObserver<T> newInner) {
return Interlocked.Exchange(ref inner, newInner);
}
public void OnNext(T value) {
inner.OnNext(value);
}
public void OnCompleted() {
inner.OnCompleted();
}
public void OnError(Exception error) {
inner.OnError(error);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
212 次 |
| 最近记录: |