ObserveOn和SubscribeOn - 正在完成工作的地方

Che*_*tah 55 .net c# system.reactive

基于阅读这个问题:SubscribeOn和ObserveOn之间有什么区别

ObserveOn设置Subscribe处理程序中代码的执行位置:

stream.Subscribe(_ => { // this code here });

SubscribeOn方法设置完成流的设置的线程.

我明白如果没有明确设置,那么使用TaskPool.

现在我的问题是,让我说我做这样的事情:

Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));

哪里有Where predicateSelectMany lots_of执行,因为some_action正在上调度执行?

Jam*_*rld 171

那里有很多关于SubscribeOn和的误导性信息ObserveOn.

摘要

  • SubscribeOn拦截对单个方法的调用IObservable<T>,即 Subscribe调用返回DisposeIDisposable句柄Subscribe.
  • ObserveOn拦截调用的方法IObserver<T>,其是OnNext,OnCompletedOnError.
  • 这两种方法都会导致在指定的调度程序上进行相应的调用.

分析与示范

该声明

ObserveOn设置执行Subscribe处理程序中代码的位置:

比起帮助更令人困惑.您所指的"订阅处理程序"实际上是一个OnNext处理程序.请记住,Subscribe方法IObservable接受一个IObserverOnNext,OnCompletedOnError方法,但扩展方法提供了接受lambda表达式,并建立一个方便重载IObserver实现你.

让我说一下这个词; 我认为"订阅处理程序" 是调用时调用的observable中的代码Subscribe.这样,上面的描述更接近于目的SubscribeOn.

SubscribeOn

SubscribeOn导致Subscribeobservable 的方法在指定的调度程序或上下文上异步执行.当你不想Subscribe从你正在运行的任何线程上调用observable上的方法时,你可以使用它- 通常是因为它可以长时间运行并且你不想阻止调用线程.

当你打电话时Subscribe,你正在召唤一个观察者,这个观察者可能是一长串可观察者的一部分.它只是SubscribeOn应用于它的效果的可观察量.现在情况可能是链中的所有可观察对象都将立即订阅并在同一个线程上 - 但事实并非如此.Concat例如,考虑一下前一个流完成时只订阅每个连续流,通常这将发生在前一个流调用的任何线程上OnCompleted.

因此,SubscribeOn在您调用Subscribe和您正在订阅的可观察对象之间,拦截调用并使其异步.

它还会影响订阅的处理.Subscribe返回IDisposable用于取消订阅的句柄.SubscribeOn确保Dispose在提供的调度程序上安排调用.

想了解什么,当混乱的公共点SubscribeOn确实是在Subscribe一个可观察的处理程序可能调用OnNext,OnCompletedOnError在这同一线程.但是,其目的不是影响这些调用.在Subscribe方法返回之前完成流的情况并不少见.Observable.Return这样做,例如.让我们来看看.

如果您使用我编写的Spy方法,并运行以下代码:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Run Code Online (Sandbox Code Playgroud)

你得到这个输出(线程ID可能会有所不同):

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
Run Code Online (Sandbox Code Playgroud)

您可以看到整个订阅处理程序在同一个线程上运行,并在返回之前完成.

让我们用它SubscribeOn来异步运行它.我们将监视Return可观察的和SubscribeOn可观察的:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
Run Code Online (Sandbox Code Playgroud)

这个输出(由我添加的行号):

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.
Run Code Online (Sandbox Code Playgroud)

01 - 主要方法在线程1上运行.

02 - 在Return调用线程上评估observable.我们刚到IObservable这里,还没有订阅.

03 - 在SubscribeOn调用线程上评估observable.

04 -现在,我们终于调用Subscribe的方法SubscribeOn.

05 - 该Subscribe方法异步完成...

06 - ...和线程1返回主方法.这就是SubscribeOn在行动中的效果!

07 - 同时,SubscribeOn安排默认调度程序的调用Return.这是在线程2上收到的.

08 -作为Return呢,它调用OnNext的上Subscribe线...

09 - SubscribeOn现在只是一个通过.

10,11 - 相同 OnCompleted

12 - 最后Return完成订阅处理程序.

希望能够清除目的和效果SubscribeOn!

ObserveOn

如果您认为SubscribeOn作为一个拦截器Subscribe是通过调用方法上不同的线程,然后ObserveOn做同样的工作,但对于OnNext,OnCompletedOnError电话.

回想一下我们原来的例子:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Run Code Online (Sandbox Code Playgroud)

这给了这个输出:

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
Run Code Online (Sandbox Code Playgroud)

现在让我们改变它来使用ObserveOn:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Run Code Online (Sandbox Code Playgroud)

我们得到以下输出:

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2
Run Code Online (Sandbox Code Playgroud)

01 - 主要方法在线程1上运行.

02 - 和以前一样,Return在调用线程上评估observable.我们刚到IObservable这里,还没有订阅.

03 - ObserveOnobservable也在调用线程上进行评估.

04 - 现在我们再次在调用线程上订阅,首先是ObserveOn可观察的...

05 - ...然后将呼叫传递给Return观察者.

06 - 现在Return调用OnNext它的Subscribe处理程序.

07 - 这是效果ObserveOn.我们可以看到OnNext在线程2上异步调度.

08 - 同时Return拨打OnCompleted线程1 ......

09 - 并且Return订阅处理程序完成了......

10 - 那么ObserveOn订阅处理程序也是如此......

11 - 所以控制返回到main方法

12 -与此同时,ObserveOn已经穿梭ReturnOnCompleted通话此期间,超过09-11主题2.这可能在任何时候发生,因为它是异步运行.恰好如此,它现在终于被称为了.

典型的用例是什么?

SubscribeOn当你需要Subscribe一个长时间运行的observable并希望尽快离开调度程序线程时,你经常会看到在GUI中使用- 可能是因为你知道它是那些在订阅处理程序中完成所有工作的observable之一.将它应用于可观察链的末尾,因为这是您订阅时调用的第一个observable.

ObserveOn当您想要确保时,您最常见到在GUI中使用OnNext,OnCompleted并且OnError调用被编组回调度程序线程.将其应用于可观察链的末端,以尽可能晚地过渡.

希望你可以看到,回答你的问题是ObserveOnDispatcher不会做出任何区别该线程WhereSelectMany正在上执行-这一切都取决于什么的线程从美其名曰!流的订阅处理程序将调用线程上调用,但它不可能说在哪里Where,并SelectMany会不知道如何运行stream实现.

生命周期比订阅调用更长的可观察对象

到目前为止,我们一直在寻找Observable.Return.ReturnSubscribe处理程序中完成其流.这不是非典型的,但流式Subscribe处理器的流程同样常见.看看Observable.Timer例如:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Run Code Online (Sandbox Code Playgroud)

这将返回以下内容:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Run Code Online (Sandbox Code Playgroud)

你可以清楚地看到认购完成后OnNextOnCompleted正在对不同的线程以后调用.

请注意,没有任何组合SubscribeOnObserveOn将对哪个线程或调度程序选择调用和启用有任何影响.TimerOnNextOnCompleted

当然,你可以SubscribeOn用来确定Subscribe线程:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
Run Code Online (Sandbox Code Playgroud)

(我故意改变到NewThreadScheduler这里以防止在Timer发生获取相同线程池线程的情况下出现混淆SubscribeOn)

赠送:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3
Run Code Online (Sandbox Code Playgroud)

在这里,你可以清楚地看到上线(1)其回国后的主线程Subscribe调用,但Timer认购取得自己的线程(2),但OnNextOnCompleted呼吁线程上运行(3).

现在ObserveOn,让我们将代码更改为(对于那些在代码中跟随的代码,使用nuget包rx-wpf):

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Run Code Online (Sandbox Code Playgroud)

这段代码有点不同.第一行确保我们有一个调度程序,我们也引入ObserveOnDispatcher- 这就像ObserveOn,除了它指定我们应该使用DispatcherScheduler 任何ObserveOnDispatcher被评估的线程.

此代码提供以下输出:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1
Run Code Online (Sandbox Code Playgroud)

请注意,调度程序(和主线程)是线程1. Timer仍在调用OnNextOnCompleted在其选择的线程上(2) - 但是ObserveOnDispatcher编组调用回调度程序线程thread(1).

另请注意,如果我们要阻止调度程序线程(例如a Thread.Sleep),您会看到ObserveOnDispatcher将阻塞(此代码在LINQPad主方法中最佳):

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");
Run Code Online (Sandbox Code Playgroud)

你会看到这样的输出:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1
Run Code Online (Sandbox Code Playgroud)

随着呼叫通过ObserveOnDispatcher只有能够一旦Sleep运行出来.

关键点

记住Reactive Extensions本质上是一个自由线程库是有用的,并试图尽可能地延迟它运行的线程 - 你必须故意干扰ObserveOn,SubscribeOn并将特定的调度程序传递给接受它们改变的运算符这个.

没有任何一个观察者能够控制它内部正在做什么 - ObserveOn并且SubscribeOn装饰器包裹观察者和可观察者的表面区域以跨越线程编组调用.希望这些例子清楚地表明了这一点.

  • 我将添加一个旨在帮助那些试图记住所有这一切的人的故事.解释James所说的一部分:`SubscribeOn`拦截对`Subscribe`的调用,`ObserverOn`拦截对`IObserver <T>`的调用.这两种方法都会导致在指定的调度程序上进行相应的调用. (5认同)
  • 很棒的答案.Rx MVP仅此一项.A ++++会再次+1. (2认同)
  • 对于这种答案,SO上应该有一个小额支付功能.对于有利可图的ICO来说,这可能是一个很好的商业计划! (2认同)

Dav*_*ier 13

我发现詹姆斯的答案非常明确和全面.然而,尽管如此,我仍然发现自己必须解释这些差异.

因此,我创建了一个非常简单/愚蠢的示例,它允许我以图形方式演示正在调用的调度程序.我已经创建了一个MyScheduler立即执行操作的类,但会更改控制台颜色.

SubscribeOn调度程序的文本输出以红色输出,而来自ObserveOn调度程序的文本以蓝色输出.

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace SchedulerExample
{

    class Program
    {
        static void Main(string[] args)
        {
            var mydata = new[] {"A", "B", "C", "D", "E"};
            var observable = Observable.Create<string>(observer =>
                                            {
                                                Console.WriteLine("Observable.Create");
                                                return mydata.ToObservable().
                                                    Subscribe(observer);
                                            });

            observable.
                SubscribeOn(new MyScheduler(ConsoleColor.Red)).
                ObserveOn(new MyScheduler(ConsoleColor.Blue)).
                Subscribe(s => Console.WriteLine("OnNext {0}", s));

            Console.ReadKey();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这输出:

调度

并供参考MyScheduler(不适合实际使用):

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace SchedulerExample
{
    class MyScheduler : IScheduler
    {
        private readonly ConsoleColor _colour;

        public MyScheduler(ConsoleColor colour)
        {
            _colour = colour;
        }

        public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
        {
            return Execute(state, action);
        }

        private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
        {
            var tmp = Console.ForegroundColor;
            Console.ForegroundColor = _colour;
            action(this, state);
            Console.ForegroundColor = tmp;
            return Disposable.Empty;
        }

        public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
        {
            throw new NotImplementedException();
        }

        public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
        {
            throw new NotImplementedException();
        }

        public DateTimeOffset Now
        {
            get { return DateTime.UtcNow; }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)