为什么我需要在完成后处理订阅?

Gus*_*dor 2 idisposable object-lifetime reactive-programming system.reactive reactive

在介绍到RX本书描述了OnSubscribe作为返回值IDisposible,并指出,订阅应该布置时OnErrorOnCompleted被调用.

一个有趣的事情是,当一个序列完成或错误时,你仍然应该处理你的订阅.

Intro到RX:Lifetime Management,OnError和OnCompleted

为什么是这样?


作为参考,这是我目前正在研究的课程.我可能会在某些时候将其提交给代码审查.

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
    private readonly Action onTimeout;
    private object signalLock = new object();
    private IObserver<Unit> signals;

    /// <summary>
    /// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
    /// </summary>
    /// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
    /// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
    public TrafficTimeout(TimeSpan timeout, Action onTimeout)
    {
        // Subscribe to a throttled observable to trigger the expirey
        var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
        IDisposable subscription = null;
        subscription = messageQueue.Throttle(timeout).Subscribe(
        p =>
        {
            messageQueue.OnCompleted();
            messageQueue.Dispose();
        });

        this.signals = messageQueue.AsObserver();
        this.onTimeout = onTimeout;
    }

    /// <summary>
    /// Signals that traffic has been received.
    /// </summary>
    public void Signal()
    {
        lock (this.signalLock)
        {
            this.signals.OnNext(Unit.Default);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Eni*_*ity 6

返回Subscribe方法返回的一次性返回仅允许您在观察结果自然结束之前手动取消订阅观察者.

如果observable完成 - 使用OnCompleted或者OnError- 那么订阅已经为您处理.

试试这段代码:

var xs = Observable.Create<int>(o =>
{
    var d = Observable.Return(1).Subscribe(o);
    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

如果你运行上面的程序,你会看到"Disposed!" 在observable完成时写入控制台,而无需调用.Dispose()订阅.

需要注意的一件重要事情是:垃圾收集器从不调用.Dispose()可观察的订阅,因此如果订阅在订阅超出范围之前没有(或可能没有)自然结束,则必须处置订阅.

以此为例,例如:

var wc = new WebClient();

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h);

var subscription =
    ds.Subscribe(d =>
        Console.WriteLine(d.EventArgs.Result));
Run Code Online (Sandbox Code Playgroud)

ds观察到的将只重视事件处理程序时,它有一个订阅,当观察到完成或订阅配置只会脱离.因为它是一个事件处理程序,所以observable永远不会完成,因为它正在等待更多的事件,因此处理是从事件中分离的唯一方法(对于上面的例子).

如果你有一个FromEventPattern你知道只会返回一个值的observable,那么.Take(1)在订阅允许事件处理程序自动分离之前添加扩展方法是明智的,然后你不需要手动处理订阅.

像这样:

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h)
    .Take(1);
Run Code Online (Sandbox Code Playgroud)

我希望这有帮助.