为什么我的RX链阻塞?

Che*_*tah 2 c# system.reactive

所以我有以下RX更改,但它似乎阻止选择,好像要保留顺序.我的理解是它应该继续委托给任务池?

var observable = Observable.Interval(TimeSpan.FromMilliseconds(10));

observable.ObserveOn(Scheduler.TaskPool)
    .Select(
    i =>
    {
        Console.WriteLine("Here" + System.Threading.Thread.CurrentThread.ManagedThreadId);
        System.Threading.Thread.Sleep(5000);
        return i;
    })
    .ObserveOn(Scheduler.TaskPool)
    .SubscribeOn(Scheduler.TaskPool)
    .Subscribe(i => { Console.WriteLine(i); });
Run Code Online (Sandbox Code Playgroud)

Jam*_*rld 6

确保事件以串行方式传递给用户是Rx语法的核心部分,也是正确操作的基础.它在大多数Rx运算符中强制执行,您不应该违反此规则.

ObserveOn和SubscribeOn的机制在这里得到了充分的解决.

ObserveOn的目的是避免阻塞调度事件的observable的线程和/或控制订阅者接收事件的线程(在您的情况下使用任务池来传递它们).

它不做的是允许订户同时接收事件 - 正如我所说,这将违反Rx的规则.

您可能会在类似的主题上找到这个问题值得一读.

  • @Cheetah - 使用各种`Subjects`或`Publish`或`Multicast`来创建可以提供给多个订阅者的observable.`Publish().RefCount().ObserveOn()`是一种同时传递给多个订阅者的好方法.另外,如果您使用了'SelectMany`而不是`Select`并返回了`Observable.Return(i).Delay(TimeSpan.FromSeconds(5))`而不是`Thread.Sleep`,那么你的OP中你可以有效地删除了阻止并允许所有睡眠同时运行(如果你有一些更复杂的持续延迟,可能会无序传递). (2认同)