订阅并在第一次操作后立即取消订阅

Num*_*ock 6 c# extension-methods idisposable system.reactive

我想IObservable<T>在收到第一个类型元素之后订阅并取消订阅(dipose)订阅T,即我只想在订阅后获得的第一个元素上调用该操作.

这是我提出的方法:

public static class Extensions
{
    public static void SubscribeOnce<T>(this IObservable<T> observable, Action<T> action)
    {
        IDisposable subscription = null;
        subscription = observable.Subscribe(t =>
        {
            action(t);
            subscription.Dispose();
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

public class Program
{
    public static void Main()
    {
        var subject = new Subject<int>();

        subject.OnNext(0);
        subject.OnNext(1);
        subject.SubscribeOnce(i => Console.WriteLine(i));
        subject.OnNext(2);
        subject.OnNext(3);

        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

它按预期工作,只打印2.这个或其他什么问题有什么不妥吗?是否有一种更清洁的方式使用RX开箱即用的扩展方法?

cwh*_*ris 16

var source = new Subject();

source
  .Take(1)
  .Subscribe(Console.WriteLine);

source.OnNext(5);
source.OnNext(6);
source.OnError(new Exception("oops!"));
source.OnNext(7);
source.OnNext(8);

// Output is "5". Subscription is auto-disposed. Error is ignored.
Run Code Online (Sandbox Code Playgroud)

Takenth元素产生后自动处理订阅.:)

至于其他要考虑的事项,对于您的自定义observable,您应该注意到您可能还想将OnError和OnCompleted通知传递给您的观察者,Take也为您处理.

内置的操作员也有其他好处,例如更好的Dispose操作.