为什么Observable.Finally在Observable.Generate结束时没有被调用?

bj0*_*bj0 2 c# task-parallel-library system.reactive

我需要在两个状态之间交替,每个状态具有不同的间隔时间.
我能想到这样做的最好方法是使用Reactive Extensions的Observable.Generate非常棒.

根据我在msdn和其他网站上看到的内容,Observable.Finally()应该在observable"优雅或异常终止"时触发.我正在测试以下代码(在LINQPad中)以查看它是如何工作的,但我根本无法获取.Finall().

var ia = TimeSpan.FromSeconds(1);
var ib = TimeSpan.FromSeconds(.2);
var start = DateTime.UtcNow;
var ct = new CancellationTokenSource();

var o = Observable.Generate(
    true,
//    s => !ct.IsCancellationRequested,
    s => (DateTime.UtcNow-start) < TimeSpan.FromSeconds(3) && !ct.IsCancellationRequested,
    s => !s,
    s => s ? "on" : "off",
    s => s? ib : ia)
//    .TakeUntil(start+TimeSpan.FromSeconds(3))
    .Concat(Observable.Return("end"));


o.Subscribe( s=> s.Dump(), ct.Token);
var t = o.ToTask(ct.Token);


t.ContinueWith(x => x.Dump("done"));
o.Finally(() => "finallY".Dump()); // never gets called?

Thread.Sleep(10000);
ct.Cancel();
Run Code Online (Sandbox Code Playgroud)

如果我使Thread.Sleep 10s,可观察序列完成并且Task.ContinueWith触发,但不是.Finally().

如果我使Thread.Sleep 2s,可观察序列被取消并且Task.ContinueWith再次触发,但不是.Finally().

为什么不?

Dax*_*ohl 5

查看Finally方法的返回类型; 应该给你一个提示.就像该Concat方法返回一个 IObservable的序列连接的新序列,但不更改原始,该Finally方法返回一个IObservable具有该最终操作的新,但您订阅原始IObservable.将以下行放在Subscribe通话前,它就能正常工作.

o = o.Finally(() => "finallY".Dump());
Run Code Online (Sandbox Code Playgroud)

我同意这是一个奇怪的API选择; 我觉得Finally作为更类似于SubscribeConcat.你订阅了最后的"事件"; 奇怪的是,API 强制你创建一个全新的IObservable,然后订阅它只是为了让Finally事情发生.另外,它允许潜在的错误(如果我们在您的问题中使用函数,这很明显),如果您订阅两次新的IObservable,您的Finally函数将执行两次.因此,您必须确保您的某个订阅位于"finallied" IObservable上,而其他订阅都在原始版本上.看起来很不寻常.

我想考虑它的方法Finally是不是要修改observable,而是修改订阅本身.也就是说,他们通常不希望你制作可公开访问的名为observables的Finally东西(var o = Observable.[...].Finally(...);)而不是它意味着与订阅调用本身(var subscription = o.Finally(...).Subscribe(...);)内联