反应性:试图了解Subject <T>的工作原理

Jas*_*per 3 c# system.reactive

试图了解如何Subject<T>,ReplaySubject<T>和其他工作.这是一个例子:

(主题是观察者观察者)

public IObservable<int> CreateObservable()
{
     Subject<int> subj = new Subject<int>();                // case 1
     ReplaySubject<int> subj = new ReplaySubject<int>();    // case 2

     Random rnd = new Random();
     int maxValue = rnd.Next(20);
     Trace.TraceInformation("Max value is: " + maxValue.ToString());

     subj.OnNext(-1);           // specific value

     for(int iCounter = 0; iCounter < maxValue; iCounter++)
     {
          Trace.TraceInformation("Value: " + iCounter.ToString() + " is about to publish");
          subj.OnNext(iCounter);
      }

      Trace.TraceInformation("Publish complete");
      subj.OnComplete();

      return subj;
 }

 public void Main()
 {
     //
     // First subscription
     CreateObservable()
         .Subscribe(
               onNext: (x)=>{  
                  Trace.TraceInformation("X is: " + x.ToString()); 
      });

     //
     // Second subscribe
     CreateObservable()
         .Subscribe(
               onNext: (x2)=>{  
                  Trace.TraceInformation("X2 is: " + x.ToString());
     });
Run Code Online (Sandbox Code Playgroud)

案例1:奇怪的情况是 - 当我使用Subject<T>没有订阅时(???) - 我从未看到"X是:"文本 - 我只看到"值是:"和"最大值是"...为什么不Subject<T>推动订阅价值?

案例2:如果我使用ReplaySubject<T>- 我确实看到了Subscription中的值,但我无法Defer对任何内容应用选项.不是Subject和不是Observable ....所以每个订阅都会收到不同的值,因为CreateObservable函数是可观察的.在哪里Defer

小智 10

无论什么时候你需要凭空创造一个可观察的,Observable.Create应该是第一个想到的东西.受试者在两种情况下进入图片:

  • 您需要某种"可寻址端点"来提供数据,以便所有订户都能接收数据.将此与具有调用端(通过委托调用)和订阅端(通过委托与+ - 和 - =语法组合)的.NET事件进行比较.在很多情况下,你会发现使用Observable.Create可以达到同样的效果.

  • 您需要在查询管道中多播消息,有效地在查询逻辑中通过许多分叉共享可观察序列,而不会触发多个预订.(想想订阅你最喜欢的杂志一次为你的宿舍,并把照片复印机放在信箱后面.你仍然支付一个订阅,虽然所有的朋友都可以通过信箱上的OnNext阅读杂志.)

此外,在很多情况下,Rx中已经有一个内置的原语可以完全满足您的需求.例如,有来自*工厂方法来桥接现有概念(例如事件,任务,异步方法,可枚举序列),其中一些使用了主题.对于第二种多播逻辑,有发布,重放等运算符系列.


Eni*_*ity 6

您需要注意代码的执行时间。

在“情况 1”中,当您使用 a 时,您会注意到对&Subject<T>的所有调用都在该方法返回可观察值之前完成。由于您使用的是 a,这意味着任何后续订阅都将错过所有值,因此您应该期望得到您所得到的 - 什么也没有。OnNextOnCompletedCreateObservableSubject<T>

您必须延迟对主题的操作,直到观察者订阅为止。使用方法来做到这一点Create。就是这样:

public IObservable<int> CreateObservable()
{
    return Observable.Create<int>(o =>
    {
        var subj = new Subject<int>();
        var disposable = subj.Subscribe(o);

        var rnd = new Random();
        var maxValue = rnd.Next(20);
        subj.OnNext(-1);
        for(int iCounter = 0; iCounter < maxValue; iCounter++)
        {
            subj.OnNext(iCounter);
        }
        subj.OnCompleted();

        return disposable;
    });
}
Run Code Online (Sandbox Code Playgroud)

为了简洁起见,我删除了所有跟踪代码。

因此,现在,对于每个订阅者,您都会在方法内重新执行代码Create,并且现在可以从内部获取值Subject<T>

使用该Create方法通常是创建从方法返回的可观察量的正确方法。

或者,您可以使用 aReplaySubject<T>并避免使用该Create方法。然而,由于多种原因,这没有吸引力。它强制在创建时计算整个序列。这为您提供了一个冷的可观察对象,您可以在不使用重放主题的情况下更有效地生成它。

现在,顺便说一句,您应该尽量避免使用主题。一般规则是,如果你使用一个主题,那么你就做错了。该CreateObservable方法最好写成这样:

public IObservable<int> CreateObservable()
{
    return Observable.Create<int>(o =>
    {
        var rnd = new Random();
        var maxValue = rnd.Next(20);
        return Observable.Range(-1, maxValue + 1).Subscribe(o);
    });
}
Run Code Online (Sandbox Code Playgroud)

根本不需要主题。

让我知道这是否有帮助。