lbe*_*ehr 5 .net c# subject system.reactive
我有一个场景,我有多个IObservable序列,我想结合,Merge然后听.但是,如果其中一个产生错误,我不希望它崩溃其他流的所有内容,以及重新订阅序列(这是一个'永久'序列).
我这样做是通过Retry()在合并之前向流附加一个,即:
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
Run Code Online (Sandbox Code Playgroud)
但是,当我想测试时会出现问题.我想测试的是,如果其中一个IObservables observables产生一个OnError,其他的应该仍然能够发送它们的值,它们应该被处理
我以为我只用两个Subject<int>s代表两个IObservables observables; 发送一个OnError(new Exception())和另一个,然后发送OnNext(1).但是,它似乎Subject<int>将重播新订阅的所有先前值(实际上Retry()是这样),将测试变为无限循环.
我尝试通过创建一个手册来解决它,该手册IObservable在第一个订阅上产生错误,然后是一个空序列,但它感觉很hacky:
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
});
Run Code Online (Sandbox Code Playgroud)
我是以错误的方式使用Subject或思考的Retry()吗?还有其他想法吗?你会如何解决这种情况?
好的,这是我想要和想到 的大理石图Retry().
o = message, X = error.
------o---o---X
\
Retry() -> \---o---o---X
\
Retry() -> \...
Run Code Online (Sandbox Code Playgroud)
我的问题可能更多,因为我没有一个好的库存类来使用前测试,因为Subject我想重播我以前的所有错误.
这是一个测试用例,显示了Subject重放其值的意思.如果我说它以冷的方式做到这一点,我是否正确使用该术语?我知道这Subject是一种创造热的可观察性的方式,但这种行为对我来说仍然感觉"冷".
var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);
Run Code Online (Sandbox Code Playgroud)
根据您更新的要求(您想要重试失败的可观察量,而不是只想忽略它们),我们可以提出一个可行的解决方案。
首先,了解冷可观察值(在每个订阅上重新创建)和热可观察值(无论订阅如何都存在)之间的区别很重要。您无法Retry()进行热观察,因为它不知道如何重新创建底层事件。也就是说,如果一个热的可观察错误,它就会永远消失。
Subject创建一个热可观察对象,从某种意义上说,您可以在OnNext没有订阅者的情况下进行调用,并且它会按预期运行。要将热可观察值转换为冷可观察值,您可以使用Observable.Defer,它将包含该可观察值的“订阅创建”逻辑。
话虽如此,这里是修改后的原始代码:
var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}),
Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));
Run Code Online (Sandbox Code Playgroud)
和测试(与之前类似):
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();
Run Code Online (Sandbox Code Playgroud)
以及预期的输出:
1
2
-1
done
Run Code Online (Sandbox Code Playgroud)
当然,您需要根据您底层可观察的内容来显着修改这个概念。使用受试者进行测试与实际使用它们不同。
我还想指出这个评论:
然而,Subject 似乎会重播新订阅的所有先前值(实际上是 Retry() ),从而将测试变成无限循环。
事实并非如此——Subject行为并非如此。基于Retry重新创建订阅的事实,代码的其他一些方面导致了无限循环,并且订阅在某个时刻会产生错误。
原答案(用于补全)
问题是它Retry()没有做你想做的事。从这里:
http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx
重复源可观察序列 retryCount 次或直到成功终止。
这意味着Retry它将不断尝试重新连接到底层可观察对象,直到成功并且不会抛出错误。
我的理解是,您实际上希望忽略可观察到的异常,而不是重试。这将做你想做的事:
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);
Run Code Online (Sandbox Code Playgroud)
这用于Catch捕获异常的可观察量,并在此时将其替换为空的可观察量。
这是使用主题的完整测试:
var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();
Run Code Online (Sandbox Code Playgroud)
正如预期的那样,这会产生:
1
2
done
Run Code Online (Sandbox Code Playgroud)