Mic*_*ael 5 design-patterns observable observer-pattern rxjs rxjs-observables
我是 RXJS 库的新手用户,正在尝试弄清楚如何正确使用 Observable 和subjects。我试图将其与模式设计观察者进行比较。在某些时候,我有一个问题,RXJS 库中的 Observable 实例是否是观察者模式设计的特例?
根据定义, AnObservable是随时间发出数据的实体。这听起来有点模糊,但同时又非常有趣。
在我看来,RxJS 的所有魔力都是通过链表实现的。
每当您创建Observableusing时new Observable(subscriber => {}),您都在定义源或链表的HEAD节点。另外,你有没有想过为什么这个参数被称为subscriberor observer?我也会尝试分享我对此的看法。
主链表是在以下命令的帮助下创建的Observable.pipe():
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return operations.length ? pipeFromArray(operations)(this) : this;
}
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
};
}
Run Code Online (Sandbox Code Playgroud)
protected lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
Run Code Online (Sandbox Code Playgroud)
如您所知,RxJS 中有很多运算符。Anoperator是一个返回另一个函数的函数,该函数的参数是 an Observable(类型T),并且其返回值也是 an Observable(类型R)。
例如,map():
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return function mapOperation(source: Observable<T>): Observable<R> {
if (typeof project !== 'function') {
throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
}
return lift(source, new MapOperator(project, thisArg));
};
}
Run Code Online (Sandbox Code Playgroud)
所以,当你有
const src$ = new Observable(s => /* ... */)
.pipe(
map(/* ... */)
)
Run Code Online (Sandbox Code Playgroud)
将会发生一些事情:
Observable实例;提供的回调(在本例中s => ...)将存储在_subscribe属性中pipe()被调用;它将返回fns[0],在本例中是mapOperation函数mapOperation将以Observable实例作为其参数来调用(来自pipeFromArray(operations)(this));当调用时,它会调用source.lift(new MapOperator(project, thisArg));; Observable.lift()是将节点添加到该链表中的内容;正如你所看到的,一个节点(除了HEAD)保存source和operator代表它当您订阅 时src$,将根据此列表创建另一个列表。在这个中,每个节点都是一个Subscriber. 创建这个列表是基于每个都operator 必须有一个call方法
export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
Run Code Online (Sandbox Code Playgroud)
export class MapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => R, private thisArg: any) {
}
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
}
}
Run Code Online (Sandbox Code Playgroud)
节点之间的关系Subscriber建立在Observable.subscribe()
在这种情况下, (上面的示例)s中的参数new Observable(s => ...)将是MapSubscriber.
看起来我似乎偏离了这个问题,但通过上述解释,我想证明这里没有太多模式Observer。
这种模式可以通过 a 来实现Subject,它扩展了Observable:
export class Subject<T> extends Observable<T> implements SubscriptionLike { }
Run Code Online (Sandbox Code Playgroud)
这意味着您可以使用Subject.pipe(...)和Subject.subscribe(subscriber)。Subject为了实现这种模式,需要一个自定义 _subscribe方法:
_subscribe(subscriber: Subscriber<T>): Subscription {
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.isStopped) {
subscriber.complete();
return Subscription.EMPTY;
} else {
// !!!
this.observers.push(subscriber);
return new SubjectSubscription(this, subscriber);
}
}
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,该类Subject跟踪它的观察者(订阅者),这样当它发出一个值时,Subject.next()它的所有观察者都会收到它:
next(value: T) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value!);
}
}
}
Run Code Online (Sandbox Code Playgroud)
作为侧节点,a也Subject可以充当 a Subscriber,因此您不必一直手动调用Subject.{next, error, complete}()。你可以用这样的东西来实现
src$.pipe(subjectInstance);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1470 次 |
| 最近记录: |