如何监视RXJS订阅数?

vit*_*y-t 21 node.js rxjs typescript

我正在使用Observable来为来自全局资源的客户端提供事件订阅界面,并且我需要根据活动订阅的数量来管理该资源:

  • 订阅数大于0时分配全局资源
  • 订阅数变为0时释放全局资源
  • 根据订阅数调整资源使用策略

RXJS中监视活动订阅数的正确方法是什么?


如何在RXJS语法中实现以下内容?--

const myEvent: Observable<any> = new Observable();

myEvent.onSubscription((newCount: number, prevCount: number) => {
   if(newCount === 0) {
      // release global resource
   } else {
      // allocate global resource, if not yet allocated
   }
   // for a scalable resource usage / load,
   // re-configure it, based on newCount
});
Run Code Online (Sandbox Code Playgroud)

我不希望在每次更改时都得到保证的通知,因此newCount+ prevCount参数。

更新1

这是不是重复到这个,因为我需要时通知用户数量的变化,而不是只在某个时刻拿到专柜。

更新2

到目前为止,没有任何答案,我很快提出了一个非常丑陋且有限的解决方法,它是通过完全封装(特别是针对type)进行的Subject。希望能找到合适的解决方案。

更新3

经过几个回答,我仍然不确定如何实现我正在尝试的方法,如下所示:

class CustomType {

}

class CountedObservable<T> extends Observable<T> {

    private message: string; // random property

    public onCount; // magical Observable that needs to be implemented

    constructor(message: string) {
        // super(); ???
        this.message = message;
    }

    // random method
    public getMessage() {
        return this.message;
    }
}

const a = new CountedObservable<CustomType>('hello'); // can create directly

const msg = a.getMessage(); // can call methods

a.subscribe((data: CustomType) => {
    // handle subscriptions here;
});

// need that magic onCount implemented, so I can do this:
a.onCount.subscribe((newCount: number, prevCont: number) => {
    // manage some external resources
});
Run Code Online (Sandbox Code Playgroud)

如何实现CountedObservable上面的此类,让我订阅自己的类以及onCount监视其客户/订阅数量的属性?

更新4

所有建议的解决方案似乎都过于复杂,即使我接受了其中一个答案,最终还是得到了自己完全定制解决方案

kos*_*kos 5

您可以使用defer跟踪订阅并完成以跟踪完成,例如以操作员的身份来实现:

// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) {
  return function refCountOperatorFunction(source$) {
    let counter = 0;

    return defer(()=>{
      counter++;
      onCountUpdate(counter);
      return source$;
    })
    .pipe(
      finalize(()=>{
        counter--;
        onCountUpdate(counter);
      })
    );
  };
}

// just a stub for `onCountUpdate`
function noop(){}
Run Code Online (Sandbox Code Playgroud)

然后像这样使用它:

const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);
Run Code Online (Sandbox Code Playgroud)

以下是说明此情况的代码段:

const { Subject, of, timer, pipe, defer } = rxjs;
const { finalize, takeUntil } = rxjs.operators;


const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

// emit events
setTimeout(()=>{
  source$.next('one');
}, 250);

setTimeout(()=>{
  source$.next('two');
}, 1000);

setTimeout(()=>{
  source$.next('three');
}, 1250);

setTimeout(()=>{
  source$.next('four');
}, 1750);


// subscribe and unsubscribe
const subscriptionA = result$
  .subscribe(value => console.log('A', value));

setTimeout(()=>{
  result$.subscribe(value => console.log('B', value));
}, 500);


setTimeout(()=>{
  result$.subscribe(value => console.log('C', value));
}, 1000);

setTimeout(()=>{
  subscriptionA.unsubscribe();
}, 1500);


// complete source
setTimeout(()=>{
  source$.complete();
}, 2000);


function customOperator(onCountUpdate = noop) {
  return function refCountOperatorFunction(source$) {
    let counter = 0;

    return defer(()=>{
      counter++;
      onCountUpdate(counter);
      return source$;
    })
    .pipe(
      finalize(()=>{
        counter--;
        onCountUpdate(counter);
      })
    );
  };
}

function noop(){}
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

*注意:如果您的原始币很冷—您可能需要共享它。

希望能帮助到你

  • **对于那些稍后阅读本文的人:**一定要查看 *paulpdaniels* 给出的“using+share”方法的详细概述,以及 *dmcgrandle* 的做法,通过从“Observable”扩展的自定义类向我们展示了一个很好的解决方案。当然可以看到 *vitaly-t* 在他的答案中最终得到了什么[这里](/sf/answers/3943688381/),这可能更容易由开发团队维护,因为它更可定制,并且透明的。 (2认同)

dmc*_*dle 5

多么有趣的问题!如果我理解您的要求,这是我的解决方案:围绕 Observable 创建一个包装类,通过拦截subscribe()和来跟踪订阅unsubscribe()。这是包装类:

export class CountSubsObservable<T> extends Observable<T>{
  private _subCount = 0;
  private _subCount$: BehaviorSubject<number> = new BehaviorSubject(0);
  public subCount$ = this._subCount$.asObservable();

  constructor(public source: Observable<T>) {
    super();
  }

  subscribe(
    observerOrNext?: PartialObserver<T> | ((value: T) => void), 
    error?: (error: any) => void, 
    complete?: () => void 
  ): Subscription {
    this._subCount++;
    this._subCount$.next(this._subCount);
    let subscription = super.subscribe(observerOrNext as any, error, complete);
    const newUnsub: () => void = () => {
      if (this._subCount > 0) {
        this._subCount--;
        this._subCount$.next(this._subCount);
        subscription.unsubscribe();
      }
    }
    subscription.unsubscribe = newUnsub;
    return subscription;
  }
}
Run Code Online (Sandbox Code Playgroud)

这个包装器创建了一个.subCount$可以订阅的辅助 observable ,它会在每次订阅源 observable 的数量发生变化时发出。它将发出一个与当前订阅者数量相对应的数字。

要使用它,您将创建一个源可观察对象,然后使用此类调用 new 来创建包装器。例如:

const source$ = interval(1000).pipe(take(10));

const myEvent$: CountSubsObservable<number> = new CountSubsObservable(source$);

myEvent$.subCount$.subscribe(numSubs => {
  console.log('subCount$ notification! Number of subscriptions is now', numSubs);
  if(numSubs === 0) {
    // release global resource
  } else {
    // allocate global resource, if not yet allocated
  }
  // for a scalable resource usage / load,
  // re-configure it, based on numSubs
});

source$.subscribe(result => console.log('result is ', result));
Run Code Online (Sandbox Code Playgroud)

要查看它的使用情况,请查看此Stackblitz

更新:

好的,正如评论中提到的,我有点难以理解数据流的来源。回顾你的问题,我看到你提供了一个“事件订阅界面”。如果数据流CustomType是您在上面的第三次更新中详细说明的流,那么您可能希望使用fromEvent()fromrxjs创建源可观察对象,您可以使用它来调用我提供的包装类。

为了说明这一点,我创建了一个新的Stackblitz。从 Stackblitz 这里是CustomTypes流以及我将如何使用 CountedObservable 类来实现您正在寻找的内容。

class CustomType {
  a: string;
}

const dataArray = [
  { a: 'January' },
  { a: 'February' },
  { a: 'March' },
  { a: 'April' },
  { a: 'May' },
  { a: 'June' },
  { a: 'July' },
  { a: 'August' },
  { a: 'September' },
  { a: 'October' },
  { a: 'November' },
  { a: 'December' }
] as CustomType[];

// Set up an arbitrary source that sends a stream of `CustomTypes`, one
// every two seconds by using `interval` and mapping the numbers into
// the associated dataArray.  
const source$ = interval(2000).pipe(
  map(i => dataArray[i]), // transform the Observable stream into CustomTypes
  take(dataArray.length),  // limit the Observable to only emit # array elements
  share() // turn into a hot Observable.
);

const myEvent$: CountedObservable<CustomType> = new CountedObservable(source$);

myEvent$.onCount.subscribe(newCount => {
  console.log('newCount notification! Number of subscriptions is now', newCount);
});
Run Code Online (Sandbox Code Playgroud)

我希望这有帮助。


pau*_*els 5

您实际上是在问三个独立的问题,我想知道您是否真的需要您提到的全部功能。由于您要求的大多数资源管理资料已经由库提供,因此执行自定义跟踪代码似乎是多余的。前两个问题:

  • 订阅数大于0时分配全局资源
  • 订阅数变为0时释放全局资源

可以使用using+ share运算符完成:

class ExpensiveResource {
  constructor () {
    // Do construction
  }
  unsubscribe () {
   // Do Tear down
  }
}

// Creates a resource and ties its lifecycle with that of the created `Observable`
// generated by the second factory function
// Using will accept anything that is "Subscription-like" meaning it has a unsubscribe function.
const sharedStream$ = using(
  // Creates an expensive resource
  () => new ExpensiveResource(), 
  // Passes that expensive resource to an Observable factory function
  er => timer(1000)
)
// Share the underlying source so that global creation and deletion are only
// processed when the subscriber count changes between 0 and 1 (or visa versa)
.pipe(share())
Run Code Online (Sandbox Code Playgroud)

之后,sharedStream$可以作为基本流来传递该基本流,该流将管理基础资源(假设您unsubscribe正确实现了您的资源),以便随着订户数在0到1之间转换,将创建并拆除该资源。

  • 根据订阅数调整资源使用策略

    我最怀疑的第三个问题是,为了完整起见,我会回答这个问题,假设您比我更了解您的应用程序(因为我想不出为什么您需要在不同的使用级别进行特定处理的原因,除了0和1)。

基本上,我将使用与上述类似的方法,但是我将略微不同地概括过渡逻辑。

// Same as above
class ExpensiveResource {
  unsubscribe() {  console.log('Tear down this resource!')}
}

const usingReferenceTracking = 
  (onUp, onDown) => (resourceFactory, streamFactory) => {
    let instance, refCount = 0
    // Again manage the global resource state with using
    const r$ = using(
      // Unfortunately the using pattern doesn't let the resource escape the closure
      // so we need to cache it for ourselves to use later
      () => instance || (instance = resourceFactory()),
      // Forward stream creation as normal
      streamFactory
      )
    ).pipe(
      // Don't forget to clean up the stream after all is said and done
      // Because its behind a share this should only happen when all subscribers unsubscribe
      finalize(() => instance = null)
      share()
    )
    // Use defer to trigger "onSubscribe" side-effects
    // Note as well that these side-effects could be merged with the above for improved performance
    // But I prefer them separate for easier maintenance.
    return defer(() => onUp(instance, refCount += 1) || r$)
      // Use finalize to handle the "onFinish" side-effects
      .pipe(finalize(() => onDown(instance, refCount -= 1)))

}

const referenceTracked$ = usingReferenceTracking(
  (ref, count) => console.log('Ref count increased to ' + count),
  (ref, count) => console.log('Ref count decreased to ' + count)
)(
  () => new ExpensiveResource(),
  ref => timer(1000)
)

referenceTracked$.take(1).subscribe(x => console.log('Sub1 ' +x))
referenceTracked$.take(1).subscribe(x => console.log('Sub2 ' +x))


// Ref count increased to 1
// Ref count increased to 2
// Sub1 0
// Ref count decreased to 1
// Sub2 0
// Ref count decreased to 0
// Tear down this resource!
Run Code Online (Sandbox Code Playgroud)

警告:这样做的副作用是,根据定义,流离开usingReferenceTracking函数后将变热,并且在首次订阅时将变热。确保在订阅阶段考虑到这一点。