Sha*_*awn 0 .net c# multithreading reactive-programming system.reactive
我已经订阅了一个推送内容的频率很高的Observable,这些内容来自网络I/O,所以每次推送都来自不同的线程,然后我有一些观察者可能会尝试获取一些内容然后快速取消订阅以确保有没有其他内容传入,所以代码示例如下:
IDisposable dsp = null;
dsp = TargetObservable.Subscribe((incomingContent) =>
{
if (incomingContent == "something")
{
myList.Add(incomingContent);
dsp.Dispose();
}
else
{
otherList.Add(incomingContent);
}
});
Run Code Online (Sandbox Code Playgroud)
现在,OnNext显然不是线程安全的,意味着当Observer在调用Dispose()之前得到"东西"时,其他内容可能仍然传入并添加到'otherList',即使我放了一个'lock(.. .)'为整个'onNext(...)'.
这不是我们想要的,所以任何想法都要避免这种情况?我可以考虑的一种方法是修改Observable逐个推送内容(通过使用'lock'),然后性能必须伤害很多.谢谢.
要使用Rx,您需要遵循Rx指南.在你的情况下有4.2问题假设观察者实例以序列化方式调用,解决方案是使用Synchronize哪个基本上介绍lock你想要避免的.如果您lock在代码中无法承担声明,则需要在将网络事件发送到Rx之前编写自己的"廉价"同步.
使用同步序列,您可以OnNext使用Rx LINQ运算符来简化处理程序中的代码,例如TakeWhile:
var subscription = TargetObservable
.Synchronize()
.TakeWhile(incomingContent => incomingContent != "something"))
.Subscribe( ... );
Run Code Online (Sandbox Code Playgroud)
或者您可以创建自己的运算符TakeWhileInclusive以包含谓词为false的最后一项:
static class ObservableExtensions {
public static IObservable<TSource> TakeWhileInclusive<TSource>(
this IObservable<TSource> source,
Func<TSource, Boolean> predicate) {
return Observable
.Create<TSource>(
observer => source.Subscribe(
item => {
observer.OnNext(item);
if (!predicate(item))
observer.OnCompleted();
},
observer.OnError,
observer.OnCompleted
)
);
}
}
Run Code Online (Sandbox Code Playgroud)