在 Observable 项目生成时处理它们

lap*_*tou 10 c# reactive-programming system.reactive

我有一个IObservable生成一次性物品的物品,它会在其生命周期内生成无限数量的物品。因此,每次生成新项目时,我都想处理最后一个项目,因此Using运算符对此不起作用。是否有不同的 Rx.NET 运算符可以完成此功能?

Eni*_*ity 5

如果您有,IObservable<IDisposable> source则执行此操作以自动处理先前的值并在序列结束时进行清理:

IObservable<IDisposable> query =
    Observable.Create<IDisposable>(o =>
    {
        var serial = new SerialDisposable();
        return new CompositeDisposable(
            source.Do(x => serial.Disposable = x).Subscribe(o),
            serial);
    })
Run Code Online (Sandbox Code Playgroud)


lap*_*tou 0

我看到了Shlomo 的这个答案,并根据我自己的目的进行了修改:

public class DisposerProperty<T> : IDisposable, IObservable<T> where T : IDisposable
{
    private IDisposable Subscription { get; }
    private IObservable<T> Source { get; }

    public T Value { get; private set; }

    public DisposerProperty(IObservable<T> source, T defaultValue = default)
    {
        Value = defaultValue;
        Source = source;
        Subscription = source.Subscribe(t =>
                                        {
                                            Value?.Dispose();
                                            Value = t;
                                        });
    }

    public void Dispose() => Subscription.Dispose();

    /// <inheritdoc />
    public IDisposable Subscribe(IObserver<T> observer) => Source.Subscribe(observer);
}
Run Code Online (Sandbox Code Playgroud)

现在,每当我需要此功能时,我只需使用 aDisposerProperty<T>而不是直接订阅可观察对象。