vit*_*y-t 21 node.js rxjs typescript
我正在使用Observable来为来自全局资源的客户端提供事件订阅界面,并且我需要根据活动订阅的数量来管理该资源:
RXJS中监视活动订阅数的正确方法是什么?
如何在RXJS语法中实现以下内容?--
const myEvent: Observable<any> = new Observable();
myEvent.onSubscription((newCount: number, prevCount: number) => {
if(newCount === 0) {
// release global resource
} else {
// allocate global resource, if not yet allocated
}
// for a scalable resource usage / load,
// re-configure it, based on newCount
});
Run Code Online (Sandbox Code Playgroud)
我不希望在每次更改时都得到保证的通知,因此newCount+ prevCount参数。
更新1
这是不是重复到这个,因为我需要时通知用户数量的变化,而不是只在某个时刻拿到专柜。
更新2
到目前为止,没有任何答案,我很快提出了一个非常丑陋且有限的解决方法,它是通过完全封装(特别是针对type)进行的Subject。希望能找到合适的解决方案。
更新3
经过几个回答,我仍然不确定如何实现我正在尝试的方法,如下所示:
class CustomType {
}
class CountedObservable<T> extends Observable<T> {
private message: string; // random property
public onCount; // magical Observable that needs to be implemented
constructor(message: string) {
// super(); ???
this.message = message;
}
// random method
public getMessage() {
return this.message;
}
}
const a = new CountedObservable<CustomType>('hello'); // can create directly
const msg = a.getMessage(); // can call methods
a.subscribe((data: CustomType) => {
// handle subscriptions here;
});
// need that magic onCount implemented, so I can do this:
a.onCount.subscribe((newCount: number, prevCont: number) => {
// manage some external resources
});
Run Code Online (Sandbox Code Playgroud)
如何实现CountedObservable上面的此类,让我订阅自己的类以及onCount监视其客户/订阅数量的属性?
更新4
您可以使用defer跟踪订阅并完成以跟踪完成,例如以操作员的身份来实现:
// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;
return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}
// just a stub for `onCountUpdate`
function noop(){}
Run Code Online (Sandbox Code Playgroud)
然后像这样使用它:
const source$ = new Subject();
const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
Run Code Online (Sandbox Code Playgroud)
以下是说明此情况的代码段:
const { Subject, of, timer, pipe, defer } = rxjs;
const { finalize, takeUntil } = rxjs.operators;
const source$ = new Subject();
const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
// emit events
setTimeout(()=>{
source$.next('one');
}, 250);
setTimeout(()=>{
source$.next('two');
}, 1000);
setTimeout(()=>{
source$.next('three');
}, 1250);
setTimeout(()=>{
source$.next('four');
}, 1750);
// subscribe and unsubscribe
const subscriptionA = result$
.subscribe(value => console.log('A', value));
setTimeout(()=>{
result$.subscribe(value => console.log('B', value));
}, 500);
setTimeout(()=>{
result$.subscribe(value => console.log('C', value));
}, 1000);
setTimeout(()=>{
subscriptionA.unsubscribe();
}, 1500);
// complete source
setTimeout(()=>{
source$.complete();
}, 2000);
function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;
return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}
function noop(){}Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>Run Code Online (Sandbox Code Playgroud)
*注意:如果您的原始币很冷—您可能需要共享它。
希望能帮助到你
多么有趣的问题!如果我理解您的要求,这是我的解决方案:围绕 Observable 创建一个包装类,通过拦截subscribe()和来跟踪订阅unsubscribe()。这是包装类:
export class CountSubsObservable<T> extends Observable<T>{
private _subCount = 0;
private _subCount$: BehaviorSubject<number> = new BehaviorSubject(0);
public subCount$ = this._subCount$.asObservable();
constructor(public source: Observable<T>) {
super();
}
subscribe(
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void
): Subscription {
this._subCount++;
this._subCount$.next(this._subCount);
let subscription = super.subscribe(observerOrNext as any, error, complete);
const newUnsub: () => void = () => {
if (this._subCount > 0) {
this._subCount--;
this._subCount$.next(this._subCount);
subscription.unsubscribe();
}
}
subscription.unsubscribe = newUnsub;
return subscription;
}
}
Run Code Online (Sandbox Code Playgroud)
这个包装器创建了一个.subCount$可以订阅的辅助 observable ,它会在每次订阅源 observable 的数量发生变化时发出。它将发出一个与当前订阅者数量相对应的数字。
要使用它,您将创建一个源可观察对象,然后使用此类调用 new 来创建包装器。例如:
const source$ = interval(1000).pipe(take(10));
const myEvent$: CountSubsObservable<number> = new CountSubsObservable(source$);
myEvent$.subCount$.subscribe(numSubs => {
console.log('subCount$ notification! Number of subscriptions is now', numSubs);
if(numSubs === 0) {
// release global resource
} else {
// allocate global resource, if not yet allocated
}
// for a scalable resource usage / load,
// re-configure it, based on numSubs
});
source$.subscribe(result => console.log('result is ', result));
Run Code Online (Sandbox Code Playgroud)
要查看它的使用情况,请查看此Stackblitz。
好的,正如评论中提到的,我有点难以理解数据流的来源。回顾你的问题,我看到你提供了一个“事件订阅界面”。如果数据流CustomType是您在上面的第三次更新中详细说明的流,那么您可能希望使用fromEvent()fromrxjs创建源可观察对象,您可以使用它来调用我提供的包装类。
为了说明这一点,我创建了一个新的Stackblitz。从 Stackblitz 这里是CustomTypes流以及我将如何使用 CountedObservable 类来实现您正在寻找的内容。
class CustomType {
a: string;
}
const dataArray = [
{ a: 'January' },
{ a: 'February' },
{ a: 'March' },
{ a: 'April' },
{ a: 'May' },
{ a: 'June' },
{ a: 'July' },
{ a: 'August' },
{ a: 'September' },
{ a: 'October' },
{ a: 'November' },
{ a: 'December' }
] as CustomType[];
// Set up an arbitrary source that sends a stream of `CustomTypes`, one
// every two seconds by using `interval` and mapping the numbers into
// the associated dataArray.
const source$ = interval(2000).pipe(
map(i => dataArray[i]), // transform the Observable stream into CustomTypes
take(dataArray.length), // limit the Observable to only emit # array elements
share() // turn into a hot Observable.
);
const myEvent$: CountedObservable<CustomType> = new CountedObservable(source$);
myEvent$.onCount.subscribe(newCount => {
console.log('newCount notification! Number of subscriptions is now', newCount);
});
Run Code Online (Sandbox Code Playgroud)
我希望这有帮助。
您实际上是在问三个独立的问题,我想知道您是否真的需要您提到的全部功能。由于您要求的大多数资源管理资料已经由库提供,因此执行自定义跟踪代码似乎是多余的。前两个问题:
可以使用using+ share运算符完成:
class ExpensiveResource {
constructor () {
// Do construction
}
unsubscribe () {
// Do Tear down
}
}
// Creates a resource and ties its lifecycle with that of the created `Observable`
// generated by the second factory function
// Using will accept anything that is "Subscription-like" meaning it has a unsubscribe function.
const sharedStream$ = using(
// Creates an expensive resource
() => new ExpensiveResource(),
// Passes that expensive resource to an Observable factory function
er => timer(1000)
)
// Share the underlying source so that global creation and deletion are only
// processed when the subscriber count changes between 0 and 1 (or visa versa)
.pipe(share())
Run Code Online (Sandbox Code Playgroud)
之后,sharedStream$可以作为基本流来传递该基本流,该流将管理基础资源(假设您unsubscribe正确实现了您的资源),以便随着订户数在0到1之间转换,将创建并拆除该资源。
根据订阅数调整资源使用策略
我最怀疑的第三个问题是,为了完整起见,我会回答这个问题,假设您比我更了解您的应用程序(因为我想不出为什么您需要在不同的使用级别进行特定处理的原因,除了0和1)。
基本上,我将使用与上述类似的方法,但是我将略微不同地概括过渡逻辑。
// Same as above
class ExpensiveResource {
unsubscribe() { console.log('Tear down this resource!')}
}
const usingReferenceTracking =
(onUp, onDown) => (resourceFactory, streamFactory) => {
let instance, refCount = 0
// Again manage the global resource state with using
const r$ = using(
// Unfortunately the using pattern doesn't let the resource escape the closure
// so we need to cache it for ourselves to use later
() => instance || (instance = resourceFactory()),
// Forward stream creation as normal
streamFactory
)
).pipe(
// Don't forget to clean up the stream after all is said and done
// Because its behind a share this should only happen when all subscribers unsubscribe
finalize(() => instance = null)
share()
)
// Use defer to trigger "onSubscribe" side-effects
// Note as well that these side-effects could be merged with the above for improved performance
// But I prefer them separate for easier maintenance.
return defer(() => onUp(instance, refCount += 1) || r$)
// Use finalize to handle the "onFinish" side-effects
.pipe(finalize(() => onDown(instance, refCount -= 1)))
}
const referenceTracked$ = usingReferenceTracking(
(ref, count) => console.log('Ref count increased to ' + count),
(ref, count) => console.log('Ref count decreased to ' + count)
)(
() => new ExpensiveResource(),
ref => timer(1000)
)
referenceTracked$.take(1).subscribe(x => console.log('Sub1 ' +x))
referenceTracked$.take(1).subscribe(x => console.log('Sub2 ' +x))
// Ref count increased to 1
// Ref count increased to 2
// Sub1 0
// Ref count decreased to 1
// Sub2 0
// Ref count decreased to 0
// Tear down this resource!
Run Code Online (Sandbox Code Playgroud)
警告:这样做的副作用是,根据定义,流离开usingReferenceTracking函数后将变热,并且在首次订阅时将变热。确保在订阅阶段考虑到这一点。
| 归档时间: |
|
| 查看次数: |
749 次 |
| 最近记录: |