调度程序:立即与CurrentThread

Ser*_*lla 9 system.reactive

在阅读了解释原因之后

Observable.Return(5)
  .Repeat()
  .Take(1)
Run Code Online (Sandbox Code Playgroud)

永远不会完成,但是

Observable.Return(5, Scheduler.CurrentThread)
  .Repeat()
  .Take(1)
Run Code Online (Sandbox Code Playgroud)

按预期工作.我仍然感到困惑,我不知道为什么currentThread实际上解决了这个问题.有人可以给出明确的解释吗?

Abs*_*lom 6

内德·斯托亚诺夫(Ned Stoyanov)在上述评论中提供的链接由戴夫·塞克斯顿(Dave Sexton)进行了很好的解释。

我将尝试以不同的方式说明它。以本示例为例,其中RecursiveTest方法中发生了递归调用。

public class RecursiveTest()
{
    private bool _isDone;

    public void RecursiveMethod()
    {
        if (!_isDone)
        {
            RecursiveMethod();

           // Never gets here...
           _isDone = true;
        }
    }  
}
Run Code Online (Sandbox Code Playgroud)

您可以轻松地看到这将无限期递归(直到StackOverflowException),因为_isDone永远不会设置为true。这是一个过于简化的示例,但基本上是您的第一个示例所进行的操作。

这是Dave Sexton的解释,描述了第一个示例中发生的情况。

默认情况下,Return使用InstantScheduler调用OnNext(1),然后调用OnCompleted()。Repeat不会引入任何并发性,因此它会立即看到OnCompleted,然后立即重新订阅Return。由于Return中没有蹦床,因此此模式会重复进行,无限期地阻塞当前线程。调用此可观察对象将永远不会返回。

换句话说,由于折返的无限循环,初始流程永远不会完全完成。因此,我们需要一种无需重新输入即可完成初始流程的方法。

让我们回到本文上面的RecursiveTest示例,如何避免无限递归?在再次执行RecursiveMethod之前,我们需要RecursiveMethod完成其流程。实现此目的的一种方法是让一个队列并使对RecursiveMethod的调用排队,如下所示:

public void RecursiveMethod()
{
    if (!_isDone)
    {
        Enqueue(RecursiveMethod);
        _isDone = true;
    }
}  
Run Code Online (Sandbox Code Playgroud)

这样,初始流程将完成,_isDone将被设置为true,并且在执行下一次对RecursiveMethod的调用时,将不再执行任何操作,从而避免了无限递归。这几乎是Scheduler.CurrentThread将对第二个示例执行的操作。

让我们看看Dave Sexton如何解释第二个示例的工作原理:

在这里,Return使用CurrentTheadScheduler调用OnNext(1),然后调用OnCompleted()。Repeat不会引入任何并发性,因此它会立即看到OnCompleted,然后立即重新订阅Return;但是,对Return的第二次预订在蹦床上安排了其(内部)动作,因为它仍在第一个计划的(外部)动作中的OnCompleted回调上执行,因此不会立即发生重复。这样,Repeat可以将一个一次性对象返回给Take,该对象最终会调用OnCompleted,通过处置Repeat来取消重复,最终,来自Subscribe的调用会返回。

再次重申,我的示例确实经过简化,以使其易于理解,并且其运行方式也不完全相同。在这里,您可以了解调度程序的真正工作原理。它使用他们所谓的蹦床,基本上就是一个确保没有重入呼叫的队列。因此,所有调用在同一线程上一个接一个地序列化。这样,可以完成初始流程,从而避免了无限重入循环。

希望这会更清楚:)