可观察的取消令牌

Mde*_*dev 5 c# system.reactive

如果在StartButton单击上创建以下可观察对象,即从停止按钮创建,则如何取消以下类型的Rx Observable.

var instance = ThreadPoolScheduler.Instance; 

Observable.Interval(TimeSpan.FromSeconds(2), instance)
                     .Subscribe(_ =>
                     {
                     Console.WriteLine(DateTime.Now); // dummy event
                     }
                     );         
Run Code Online (Sandbox Code Playgroud)

Jon*_*eet 12

你保留了IDisposable返回的内容Subscribe,然后调用Dispose它.

可能有一种方法可以将IDisposable基于Rx 的取消订阅与CancellationToken开箱即用相结合,但只是调用Dispose将是一个开始.(您可以随时注册一个带有取消令牌的续约来调用dispose ...)

这是一个简短但完整的示例来演示:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        var instance = ThreadPoolScheduler.Instance;
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

        var disposable = Observable
            .Interval(TimeSpan.FromSeconds(0.5), instance)
            .Subscribe(_ => Console.WriteLine(DateTime.UtcNow));
        cts.Token.Register(() => disposable.Dispose());
        Thread.Sleep(10000);
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 我喜欢你偷偷用 DateTime.UtcNow 替换 DateTime.Now :) (2认同)

Tim*_*lds 12

只要使用的重载之一Subscribe,需要一个CancellationToken:

observable.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cancellationToken);
Run Code Online (Sandbox Code Playgroud)

这简化了Jon Skeet的例子:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        var instance = ThreadPoolScheduler.Instance;
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

        Observable.Interval(TimeSpan.FromSeconds(0.5), instance)
            .Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cts.Token);
        Thread.Sleep(10000);
    }
}
Run Code Online (Sandbox Code Playgroud)