用运算符扩展RxJS Observable类

Est*_*ask 5 javascript rxjs typescript ecmascript-6 rxjs5

如何Observable通过向其应用内置的RxJS运算符来扩展类?

我想做这样的事情:

class TruthyObservable extends Observable {
  constructor(subscriber) {
    super(subscriber);

    return this.filter(x => x);
  }
}

class TruthyMappedObservable extends TruthyObservable {
  constructor(subscriber) {
    super(subscriber);

    return this.map(x => `'${x}'`);
  }
}
Run Code Online (Sandbox Code Playgroud)

可以在没有构造函数返回的情况下完成此操作吗?

mar*_*tin 3

这很大程度上取决于您想要做什么,但假设您想要创建一个TruthyObservable行为非常类似于默认值Observable.create(...)但仅传递偶数的函数:

import { Observable, Observer, Subscriber, Subject, Subscription } from 'rxjs';
import 'rxjs/add/operator/filter';

class TruthyObservable<T> extends Observable<T> {

    constructor(subscribe?: <R>(this: Observable<T>, subscriber: Subscriber<R>) => any) {
        if (subscribe) {
            let oldSubscribe = subscribe;
            subscribe = (obs: Subscriber<any>) => {
                obs = this.appendOperators(obs);
                return oldSubscribe.call(this, obs);
            };
        }

        super(subscribe);
    }

    private appendOperators(obs: Subscriber<any>) {
        let subject = new Subject();

        subject
            .filter((val: number) => val % 2 == 0)
            .subscribe(obs);

        return new Subscriber(subject);
    }

}

let o = new TruthyObservable<number>((obs: Observer<number>) => {
    obs.next(3);
    obs.next(6);
    obs.next(7);
    obs.next(8);
});

o.subscribe(val => console.log(val));
Run Code Online (Sandbox Code Playgroud)

这会打印到控制台:

6
8
Run Code Online (Sandbox Code Playgroud)

查看现场演示:https://jsbin.com/recuto/3/edit ?js,console

通常,继承的类Observable会覆盖_subscribe()实际在内部进行订阅的方法,但在我们的例子中,我们希望使用回调,我们可以自己发出值(因为此 Observable 本身不发出任何内容)。如果方法存在,则它_subscribe()会被属性所掩盖_subscribe,因此如果我们只是覆盖此方法,我们将无法向其附加任何运算符。这就是为什么我用_subscribe另一个函数包装构造函数,然后通过Subject链式 with filter()inappendOperators()方法传递所有值。Subject请注意,我用at替换了原来的 Observer obs = this.appendOperators(obs)

最后当我打电话时。obs.next(3);事实上,我将值推送到Subject过滤它们并将它们传递到原始Observer.