Rx .NET 中的调度

Dmi*_*gin 2 c# system.reactive .net-core

预计都在 .NET Core 2.0 控制台应用程序的主线程上执行,所以输出被阻塞了 10 秒:

    static void Main(string[] args)
    {
        WriteLine($"We are on {Thread.CurrentThread.ManagedThreadId}");

        var subject = new Subject<long>();
        var subscription = subject.Subscribe(
            i => WriteLine($"tick on {Thread.CurrentThread.ManagedThreadId}"));

        var timer = Observable.Interval(TimeSpan.FromSeconds(1))
            .SubscribeOn(Scheduler.CurrentThread)
            .Subscribe(i => subject.OnNext(i));

        Thread.Sleep(10000);
    }
Run Code Online (Sandbox Code Playgroud)

但事实并非如此——随机线程每隔一秒就会有一条新行进入控制台:

We are on 1
tick on 4
tick on 5
tick on 4
tick on 4
tick on 4
tick on 4
tick on 4
tick on 4
tick on 5
Run Code Online (Sandbox Code Playgroud)

我做错了什么?

Mar*_*ich 5

Scheduler.CurrentThread/CurrentThreadScheduler将队列作出调用的时间表,这将是计时器发生在其上运行的线程在同一线程上的项目。调用Scheduler.CurrentThread不会将通过它调度的项目的执行固定到您调用Scheduler.CurrentThread的线程,而是将调用的线程固定到.Schedule().

此外,您调用SubscribeOn()它只会影响.Subscribe()将要进行调用的线程。如果您想控制项目处理的执行,您宁愿调用.ObserveOn().

如果您希望所有内容都在主线程上运行,我建议在主线程上运行计时器,方法是在可观察的间隔上指定调度程序:

Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.CurrentThread)
Run Code Online (Sandbox Code Playgroud)