Abh*_*tel 12 reactive-programming system.reactive
如果发生错误,有没有办法让一个可观察的序列继续执行序列中的下一个元素?从这篇文章看起来你需要在Catch()中指定一个新的可观察序列来恢复执行,但是如果你需要继续处理序列中的下一个元素呢?有没有办法实现这个目标?
更新:场景如下:我有一堆我需要处理的元素.处理由一系列步骤组成.我已将这些步骤分解为我想要编写的任务.我按照这里发布的ToObservable()指南 将任务转换为可观察的构图.所以我基本上都是这样做的 -
foreach(element in collection)
{
var result = from aResult in DoAAsync(element).ToObservable()
from bResult in DoBAsync(aResult).ToObservable()
from cResult in DoCAsync(bResult).ToObservable()
select cResult;
result.subscribe( register on next and error handlers here)
}
Run Code Online (Sandbox Code Playgroud)
或者我可以这样:
var result =
from element in collection.ToObservable()
from aResult in DoAAsync(element).ToObservable()
from bResult in DoBAsync(aResult).ToObservable()
from cResult in DoCAsync(bResult).ToObservable()
select cResult;
Run Code Online (Sandbox Code Playgroud)
这里继续处理其他元素的最佳方法是什么,即使让我们说其中一个元素的处理会引发异常.我希望能够记录错误并理想地继续前进.
Eni*_*ity 12
詹姆斯和理查德都提出了一些好处,但我认为他们没有给你解决问题的最佳方法.
詹姆斯建议使用.Catch(Observable.Never<Unit>()).当他说"将......允许流继续"时他错了,因为一旦你遇到异常,流就必须结束 - 这就是理查德在提到观察者与观察者之间的契约时所指出的.
此外,Never以这种方式使用将导致您的可观察量永远不会完成.
简短的回答是,这.Catch(Observable.Empty<Unit>())是将序列从以错误结束的序列更改为以完成结束的序列的正确方法.
您已经找到了使用SelectMany处理源集合的每个值的正确想法,以便您可以捕获每个异常,但是您会遇到一些问题.
您正在使用任务(TPL)将函数调用转换为可观察对象.这会强制您的observable使用任务池线程,这意味着该SelectMany语句可能会以非确定性顺序生成值.
您还隐藏了处理数据的实际调用,使重构和维护变得更加困难.
我认为你最好创建一个允许跳过异常的扩展方法.这里是:
public static IObservable<R> SelectAndSkipOnException<T, R>(
this IObservable<T> source, Func<T, R> selector)
{
return
source
.Select(t =>
Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
.Merge();
}
Run Code Online (Sandbox Code Playgroud)
使用此方法,您现在可以简单地执行此操作:
var result =
collection.ToObservable()
.SelectAndSkipOnException(t =>
{
var a = DoA(t);
var b = DoB(a);
var c = DoC(b);
return c;
});
Run Code Online (Sandbox Code Playgroud)
此代码更简单,但它隐藏了异常.如果你想在继续你的序列的同时坚持异常,那么你需要做一些额外的乐趣.向Materialize扩展方法添加几个重载可以防止错误.
public static IObservable<Notification<R>> Materialize<T, R>(
this IObservable<T> source, Func<T, R> selector)
{
return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}
public static IObservable<Notification<R>> Materialize<T, R>(
this IObservable<Notification<T>> source, Func<T, R> selector)
{
Func<Notification<T>, Notification<R>> f = nt =>
{
if (nt.Kind == NotificationKind.OnNext)
{
try
{
return Notification.CreateOnNext<R>(selector(nt.Value));
}
catch (Exception ex)
{
ex.Data["Value"] = nt.Value;
ex.Data["Selector"] = selector;
return Notification.CreateOnError<R>(ex);
}
}
else
{
if (nt.Kind == NotificationKind.OnError)
{
return Notification.CreateOnError<R>(nt.Exception);
}
else
{
return Notification.CreateOnCompleted<R>();
}
}
};
return source.Select(nt => f(nt));
}
Run Code Online (Sandbox Code Playgroud)
这些方法允许你写这个:
var result =
collection
.ToObservable()
.Materialize(t =>
{
var a = DoA(t);
var b = DoB(a);
var c = DoC(b);
return c;
})
.Do(nt =>
{
if (nt.Kind == NotificationKind.OnError)
{
/* Process the error in `nt.Exception` */
}
})
.Where(nt => nt.Kind != NotificationKind.OnError)
.Dematerialize();
Run Code Online (Sandbox Code Playgroud)
您甚至可以链接这些Materialize方法并使用ex.Data["Value"]&ex.Data["Selector"]来获取抛出错误的值和选择器函数.
我希望这有帮助.
IObservable和之间的合同IObserver受到OnNext*(OnCompelted|OnError)?所有运营商的维护,即使不是来源。
您唯一的选择是使用 重新订阅源Retry,但如果源返回每个描述的IObservable 实例,您将不会看到任何新值。
您能否提供有关您的场景的更多信息?也许还有另一种看待它的方式。
编辑:根据您更新的反馈,听起来您只需要Catch:
var result =
from element in collection.ToObservable()
from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>())
from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>())
from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>())
select cResult;
Run Code Online (Sandbox Code Playgroud)
这会将错误替换为Empty不会触发下一个序列的错误(因为它SelectMany在幕后使用。
| 归档时间: |
|
| 查看次数: |
5753 次 |
| 最近记录: |