跟踪Observable中的(观察者数量)?

Jon*_*ter 7 c# system.reactive

我有一个可观察的代表股票价格的流.如果我的可观察序列上没有观察者,我希望能够从提供价格流的远程服务器断开连接,但我不希望这样做,直到每个观察者都调用Dispose().然后以类似的方式,当第一个人调用Subscribe时,我想重新连接到远程服务器.

有没有办法弄清楚有多少观察者在可观察量上调用了订阅?或者也许是一种了解观察者何时调用Subscribe或Dispose的方法?

And*_*mes 10

我只想使用RefCount/Publish.我总觉得如果我正在实施IObservable,我的工作方式太难了.

myColdObservable.Publish().RefCount();
Run Code Online (Sandbox Code Playgroud)

在每个人断开连接后,这将使您的可观察停止脉冲.这是一个示例:

var coldObservable = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.TaskPool)
    .Select(_ => DoSomething());

var refCountObs = coldObservable.Publish().RefCount();

CompositeDisposable d = new CompositeDisposable();
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n)));

//Wait a bit for work to happen
System.Threading.Thread.Sleep(10000);

//Everyone unsubscribes
d.Dispose();

//Observe that DoSomething is not called.
System.Threading.Thread.Sleep(3000);
Run Code Online (Sandbox Code Playgroud)

这并不包括您实际想知道订户数量的情况,但我认为如果没有订阅者,这符合您停止工作的要求.


Rol*_*ant 6

有点旧,但我遇到了这个帖子,因为我有一个问题,我需要知道订阅者的数量.使用Bart的建议我想出了这个扩展.

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged)
{
 int count = 0;

 return Observable.Defer(() =>
 {
    count = Interlocked.Increment(ref count);
    countChanged(count);
    return source.Finally(() =>
     {
        count = Interlocked.Decrement(ref count);
        countChanged(count);
     });
 });
}
Run Code Online (Sandbox Code Playgroud)


Muh*_*han 3

IObservable<T>是一个可以实现的接口。在接口的 Subscribe 方法中,您可以通过内部维护列表来跟踪观察者。

以下代码片段来自 MSDN。

private List<IObserver<Location>> observers;

public IDisposable Subscribe(IObserver<Location> observer) 
{
   if (! observers.Contains(observer)) 
      observers.Add(observer);

   // ------- If observers.Count == 1 create connection. -------

   return new Unsubscriber(observers, observer);
}
private class Unsubscriber : IDisposable
{
   private List<IObserver<Location>>_observers;
   private IObserver<Location> _observer;

   public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer)
   {
      this._observers = observers;
      this._observer = observer;
   }

   public void Dispose()
   {
      if (_observer != null && _observers.Contains(_observer))
         _observers.Remove(_observer);
      // ----------- if observers.Count == 0 close connection -----------
   }
}
Run Code Online (Sandbox Code Playgroud)