Len*_*ena 5 c# system.reactive
我们最近将系统从RX 1.11111移植到RX 2.0并发现了这个问题.我们对ObserveOn使用EventLoopScheduler,如下所示:
IDisposable subscription = someSubject
.ObserveOn(m_eventLoopScheduler)
.SomeMoreRXFunctions()
.Subscribe((something)=>something)
Run Code Online (Sandbox Code Playgroud)
调度程序位于应用程序出口(m_eventLoopScheduler.Dispose)上.在此之前,我们处理observable(subscription.Dispose)的所有订阅.
尽管如此,我们正在进入ObjectDisposedException内部EventLoopScheduler.Schedule.捕获该异常是不可能的,因为它起源于RX线程.这几乎就像Dispose没有摆脱某些队列中的所有项目.
我们试图删除调用,EventLoopScheduler.Dispose异常消失了.但是,SomeMoreRXFunctions()尽管所有订阅都已被处理,但代码仍执行了大约10次.
有没有其他方法可以正确关闭EventLoopScheduler?
(对不起,无法抗拒双关语!)IObservable<out T>,几乎每个Rx操作员实现的界面只有一个重要的方法:
IDisposable Subscribe(IObserver<T> observer);
Run Code Online (Sandbox Code Playgroud)
纯粹通过这种方法并处理它的返回值,观察者(实现IObserver<T>)可以确定订阅何时开始和结束.
当订阅作为链的一部分的Observable时,通常(直接或间接),这将导致订阅进一步向上.确切地说,如果发生这种情况,那么就是Observable.
在许多情况下,订阅收到的订阅之间的关系不是一对一的.一个例子是Publish(),它最多只能有一个订阅源,不管它收到的订阅数量.这真的是Publish的重点.
在其他情况下,这种关系具有时间方面.例如,Concat()不会订阅它的第二个流,直到第一个流OnCompleted()- 这可能永远不会!
值得花一点时间来研究Rx设计指南,因为它们有一些非常相关的话要说:
4.4.尽最大努力停止取消订阅的所有优秀工作.当在可观察订阅上调用取消订阅时,可观察序列将尽最大努力尝试停止所有未完成的工作.这意味着任何尚未启动的排队工作都将无法启动.
任何已在进行中的工作仍可能完成,因为中止正在进行的工作并不总是安全的.此工作的结果不会向任何先前订阅的观察者实例发出信号.
注意这里的含义; 最重要的是,当完成任何上游订阅时,它完全取决于Observable的实现.换句话说,绝对不能保证处理订阅会导致Observable处置它直接或间接发出的任何或所有订阅.这适用于运营商或其上游订阅使用的任何其他资源(例如计划的操作).
您可以期待的最好的是,每个上游运营商的作者确实尽最大努力阻止所有出色的工作.
没有看到SomeMoreRXFunctions我无法确定的内容,但看起来很有可能你所看到的异常是由于 - 尽管处理了你所知道的订阅 - 通过处置调度程序你已经从下面撕下了地毯还在运行订阅的脚.实际上,你造成这种情况:
void Main()
{
var scheduler = new EventLoopScheduler();
// Decide it's time to stop
scheduler.Dispose();
// The next line will throw an ObjectDisposedException
scheduler.Schedule(() => {});
}
Run Code Online (Sandbox Code Playgroud)
编写一个完全合理的运算符很容易导致这个问题 - 即使是不直接使用调度程序的运算符!考虑一下:
public static class ObservableExtensions
{
public static IObservable<TSource> ReasonableDelay<TSource, TDelay>
(this IObservable<TSource> source, IObservable<TDelay> delay)
{
return Observable.Create<TSource>(observer =>
{
var subscription = new SerialDisposable();
subscription.Disposable = delay
.IgnoreElements()
.Subscribe(_ => {}, () => {
Console.WriteLine("Waiting to subscribe to source");
// Artifical sleep to create a problem
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Subscribing to source");
// Is this line safe?
subscription.Disposable = source.Subscribe(observer);
});
return subscription;
});
}
}
Run Code Online (Sandbox Code Playgroud)
一旦传递的延迟可观察完成,该运营商将订阅源.看它是多么合理 - 它使用a SerialDisposable来正确地呈现两个潜在的时间上单独的订阅作为一个一次性的观察者.
但是,破坏此运算符并使其导致异常是微不足道的:
void Main()
{
var scheduler = new EventLoopScheduler();
var rx = Observable.Range(0, 10, scheduler)
.ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1)));
var subs = rx.Subscribe();
Thread.Sleep(TimeSpan.FromSeconds(2));
subs.Dispose();
scheduler.Dispose();
}
Run Code Online (Sandbox Code Playgroud)
这里发生了什么事?我们正在Range使用EventLoopScheduler 创建一个,但是使用它的默认调度程序ReasonableDelay创建延迟流Timer.
现在我们订阅,等到我们的延迟流完成,然后我们按照"正确的顺序"处理我们的订阅和EventLoopScheduler.
我插入了人工延迟Thread.Sleep确保可以很容易地自然发生的竞争条件-延迟已经完成,认购已被释放,但为时已晚,以防止Range操作人员访问处置EventLoopScheduler.
我们甚至可以加强我们的合理努力,以检查观察员在延迟部分完成后是否取消订阅:
// In the ReasonableDelay method
.Subscribe(_ => {}, () => {
if(!subscription.IsDisposed) // Check for unsubscribe
{
Console.WriteLine("Waiting to subscribe to source");
// Artifical sleep to create a problem
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Subscribing to source");
// Is this line safe?
subscription.Disposable = source.Subscribe(observer);
}
});
Run Code Online (Sandbox Code Playgroud)
它无济于事.纯粹在此运算符的上下文中也无法使用锁定语义.
你没有处理EventLoopScheduler的业务!一旦将其传递给其他Rx操作员,您就已经承担了责任.它是由与Rx运营商遵循准则的清理他们的订阅中及时的方式可能 - 这意味着直接或间接地取消对EventLoopScheduler任何未决的计划项目,并停止任何进一步的调度,使得它的队列尽快清空可能.
在上面的例子中,你可以归因问题了几分做作使用多个调度和ReasonableDelay强制睡眠的 - 但它不是很难想象一个真正的场景,运营商不能立即清理.
本质上,通过配置Rx调度程序,您正在执行Rx等效的线程中止.就像那个场景一样,你可能有例外处理!
正确的做法是拉开神秘面纱SomeMoreRXFunctions(),确保他们尽可能地遵守指导方针.
部分解决。这个案例比这里展示的要复杂得多。链条是这样的:
var Publication = someSubject.ObserveOn(m_eventLoopScheduler).SomeMoreRXFunctions().Publish();
IDisposable 一次性1 = 已发布.Connect();
IDisposabledisposable2=published.Subscribe((某事)=>某事);
如果我同时处理了disposable1和disposable2,则SomeMoreRXFunctions()中的代码将不再执行。另一方面,尝试处置调度程序本身仍然会引发相同的异常。
不幸的是,我无法用更简单的代码重现该问题。这可能表明我还缺少其他东西。
这是我们可以接受的解决方案,但我仍然希望找到更好的解决方案,可以立即关闭调度程序,没有例外的机会。
| 归档时间: |
|
| 查看次数: |
1101 次 |
| 最近记录: |