创建对IObservable的弱订阅

For*_*say 21 c# garbage-collection weak-references system.reactive

我想要做的是确保如果对我的观察者的唯一引用是可观察的,它会被垃圾收集并停止接收消息.

假设我有一个带有列表框的控件,名为Messages,后面是这个代码:

//Short lived display of messages (only while the user's viewing incoming messages)
public partial class MessageDisplay : UserControl
{
    public MessageDisplay()
    {
        InitializeComponent();
        MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m));
    }
}
Run Code Online (Sandbox Code Playgroud)

哪个连接到此来源:

//Long lived location for message store
static class MySource
{
    public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>;
}
Run Code Online (Sandbox Code Playgroud)

我不想要的是让消息显示器在不再可见后很长时间内保存在内存中.理想情况下,我想要一点延伸,所以我可以写:

MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m));
Run Code Online (Sandbox Code Playgroud)

我也不想依赖MessageDisplay是一个用户控件的事实,因为我稍后想要使用MessageDisplayViewModel进行MVVM设置,这不是用户控件.

dtb*_*dtb 13

您可以将代理观察者订阅到包含对实际观察者的弱引用的observable,并在实际观察者不再存活时处置订阅:

static IDisposable WeakSubscribe<T>(
    this IObservable<T> observable, IObserver<T> observer)
{
    return new WeakSubscription<T>(observable, observer);
}

class WeakSubscription<T> : IDisposable, IObserver<T>
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    void IObserver<T>.OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    void IObserver<T>.OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    void IObserver<T>.OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 您应该在此之前执行投影和过滤。Subscribe(M=&gt;DoSomethingWithM(M)) 在内部创建一个 IObserver&lt;T&gt; 来包装委托 M=&gt;DoSomethingWithM(M)。您需要让内部创建的 IObserver&lt;T&gt; 保持活动状态,这是不可能的,因为它是内部的。因此,在考虑之后,我实际上不会再推荐我的答案。寻找不同的方法。 (2认同)

For*_*say -1

下面的代码的灵感来自 dtb 的原始帖子。唯一的变化是它返回对观察者的引用作为 IDisposable 的一部分。这意味着,只要您保留对在链末尾获得的 IDisposable 的引用,对 IObserver 的引用就会保持活动状态(假设所有一次性对象都保留对它们之前的一次性对象的引用)。这允许使用扩展方法,例如Subscribe(M=>DoSomethingWithM(M))因为我们保留对隐式构造的 IObserver 的引用,但我们不保留从源到 IObserver 的强引用(这会产生内存泄漏)。

using System.Reactive.Linq;

static class WeakObservation
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> observable)
    {
        return Observable.Create<T>(observer =>
            (IDisposable)new DisposableReference(new WeakObserver<T>(observable, observer), observer)
            );
    }
}

class DisposableReference : IDisposable
{
    public DisposableReference(IDisposable InnerDisposable, object Reference)
    {
        this.InnerDisposable = InnerDisposable;
        this.Reference = Reference;
    }

    private IDisposable InnerDisposable;
    private object Reference;

    public void Dispose()
    {
        InnerDisposable.Dispose();
        Reference = null;
    }
}

class WeakObserver<T> : IObserver<T>, IDisposable
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakObserver(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    public void OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    public void OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    public void OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 我刚刚测试了这段代码,它仍然泄漏。我强烈建议不要尝试这样做。整个思路都存在问题。1) 静态重放主题 - 它永远不会释放其缓存 2) 如果您不为 Rx 实现 Dispose 模式,您还不会释放什么?- 事件处理程序、IO 连接?3) 用户无法确定地处置您的资源 4) 代码实际上不起作用 5) 少即是多。你有更多的代码会欺骗其他编码员,让他们认为这段代码可以工作,但实际上却行不通,只会创建 100 行噪音代码。**请不要这样做** (2认同)