为什么Rx Observable.Subscribe阻止我的线程?

Eld*_*dar 4 system.reactive observable

您好'我已经尝试了101个Rx示例中的一个:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
Run Code Online (Sandbox Code Playgroud)

我不明白为什么"按任意键取消订阅"这一行从未显示过.我的理解是订阅是异步的,你订阅它并立即返回.我错过了什么导致我的主线程被阻止?

Ric*_*lay 7

阻塞是由您的可枚举循环while (true)IEnumerable<T>.ToObservable()默认扩展方法的组合引起的CurrentThreadScheduler.

如果您提供Scheduler.TaskPool(或Scheduler.ThreadPool在.NET 4之前)过载ToObservable,您应该看到您期望的行为(尽管它不会在主线程FYI上调用您的订户).

话虽如此,我认为你会发现你的组合,Thread.Sleep并将Throttle按预期工作.您最好创建一个使用调度程序来安排延迟的自定义observable.