ie1*_*ie1 1 c# reactive-programming system.reactive
当我订阅有时抛出异常的方法时,我会得到2个不同的行为.如果我在中间连接LINQ方法,那么订阅就会被处理掉,另外一点也不是,为什么呢?
void main(){
var numbersSubject=new Subject<int>();
numbersSubject.subscribe(throwMethod); // 1,2,3,4,6,7,8,9,10
// numbersSubject.select(num=>num).subscribe(throwMethod); // 1,2,3,4
for(int i=0;i<10;i++)
{
try{
numbersSubject.OnNext(i);
}catch{}
}
}
void throwMethod(int num)
{
if(num==5)
throw new Exception();
Console.writeLine(i);
}
Run Code Online (Sandbox Code Playgroud)
所以,澄清一下:
当运行没有 LINQ运算符的版本时,我们会看到:
0,1,2,3,4,6,7,8,9
Run Code Online (Sandbox Code Playgroud)
当运行LINQ运算符的版本时,我们看到:
0,1,2,3,4
Run Code Online (Sandbox Code Playgroud)
值得注意的是,当您向第二个版本订阅一个好的和坏的行为Observer时,您将获得如下所示的输出(注释中的输出),如下所示:
numbersSubject.Subscribe(throwMethod);
var source = numbersSubject.Select(num=>num);
source.Subscribe(Console.WriteLine); // 0,1,2,3,4,5,6,7,8,9
source.Subscribe(throwMethod); // 0,1,2,3,4
Run Code Online (Sandbox Code Playgroud)
请注意,"好"观察者获取所有事件.
原因是构建运算符有一个保护层来处理坏Observer 的订阅.
从Rx源代码我们看到:
为了适当的资源清理,需要保护管道免受恶意观察员的侵害.请考虑以下示例:
var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = <some random sequence>;
var res = xs.CombineLatest(ys, (x, y) => x + y);
Run Code Online (Sandbox Code Playgroud)
上面查询的大理石图如下所示:
xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
| | | | | | | | |
ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
| | | | | | | | | | | | | |
v v v v v v v v v v v v v v
res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
|
@#&
Run Code Online (Sandbox Code Playgroud)
注意Rx的自由线程特性,其中结果序列上的消息由两个输入序列中的任何一个产生CombineLatest.
现在假设在上面标记的指示点OnNext的观察者的回调中发生异常.回调在上下文中运行,因此异常将取消调度程序线程.这本身就是一个问题(可以通过运算符来缓解),但请注意生成的计时器是如何保持活动的.res@#&ysysCatchISchedulerxs
安全保护代码确保在用户回调抛出时处理所获取的资源.
以上是在称为内部类中实现AutoDetachObserver和包裹通过SafeObserver所使用的大多数内部运算符.
所有这一切都是使用try...finally异常处理程序包装每个OnXXX调用,其中finally块在发生错误时处理订阅 - 例如OnNext:
var __noError = false;
try
{
observer.OnNext(value);
__noError = true;
}
finally
{
if (!__noError)
Dispose();
}
Run Code Online (Sandbox Code Playgroud)
Subject,出于性能原因,没有这层保护.添加它(以及防止其他滥用)的快速方法是将Synchronize()操作符添加到主题中.例如:
var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Subscribe(throwMethod);
Run Code Online (Sandbox Code Playgroud)
会输出
0,1,2,3,4,6,7,8,9
Run Code Online (Sandbox Code Playgroud)
但Synchronize如图所示添加:
var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Synchronize().Subscribe(throwMethod);
Run Code Online (Sandbox Code Playgroud)
会输出
0,1,2,3,4
Run Code Online (Sandbox Code Playgroud)
与其他内置运算符一致(加上你实现的任何运算符Observable.Create).
在评论的提示下,这里有一些关于处理从观察者抛出的异常的附加说明.
当异常来自OnNext处理程序时,它位于堆栈中的Rx代码之下,因此无法负责地将其"返回"传递给用户.此时用户必须被视为死亡.我们可以合理地做的就是处理订阅并清理由此产生的资源.这是基于推送的代码的结果.与IEnumerable对比,可以将异常抛出到客户端代码,因为它是客户端执行拉动.
请注意,某些运算符包含用户提供的逻辑(如Where运算符中的谓词表达式),它将通过OnError通道将错误传播给观察者,但是一旦观察者通过在其自己的代码中抛出异常而死亡 - 就是这样.它不会再通过任何OnXXX方法调用.
在这部史诗般的Bart de Smet帖子的一半左右开始,还有更多的内容.
| 归档时间: |
|
| 查看次数: |
188 次 |
| 最近记录: |