如何在TypeScript中创建扩展Rx.Observable的类?

anj*_*shi 4 rxjs typescript

我已经使用了Definitely Typed中键入的最新版本的rx.js。

当我尝试这个:

class MyObservable extends Rx.Observable<any> { }
Run Code Online (Sandbox Code Playgroud)

我有: A class may only extend another class.

为什么将Observableand Subject等定义为in中的类的接口rx.d.ts

如果要创建扩展Observable或Subject的类,该怎么办?

PS:我希望此类处理特定的域逻辑,因此我需要创建一个新类,而不是直接更新Observable的原型。

谢谢!

Oli*_*old 5

我必须为WebRx解决相同的问题。正如您已经发现的那样,使用Typescript类扩展RxJS的IObservable并不是一种选择,因为Observable被导出为接口。正如我在对史蒂夫·芬顿的回答的评论中提到的那样,创建一个实现Rx.IObservable的类也不会使您走得太远,因为绝大多数Rx运算符都是围绕Rx.Observable接口定义的,该接口是从Rx派生的。可观察的。最后,您将几乎完全重写了Rx.Observable。

在解决更好的方法之前,我解决该问题的方法是使用原型继承扩展内置的Rx.Observable 并通过自定义d.ts文件导出扩展:

RxExtension.ts

var RxObsConstructor = (<any> Rx.Observable);   // this hack is neccessary because the .d.ts for RxJs declares Observable as an interface)

/**
* Creates an read-only observable property with an optional default value from the current (this) observable
* (Note: This is the equivalent to Knockout's ko.computed)
* @param {T} initialValue? Optional initial value, valid until the observable produces a value
*/
RxObsConstructor.prototype.toProperty = function(initialValue?: any, scheduler?: Rx.IScheduler) {
    scheduler = scheduler || Rx.Scheduler.currentThread;

    // initialize accessor function (read-only)
    var accessor: any = (newVal?: any): any => {
        if (arguments.length > 0) {
            internal.throwError("attempt to write to a read-only observable property");
        }

        if (accessor.sub == null) {
            accessor.sub = accessor._source.connect();
        }

        return accessor.value;
    };

    //////////////////////////////////
    // IUnknown implementation

    accessor.queryInterface = (iid: string) => {
        if (iid === IID.IUnknown ||
            iid === IID.IObservableProperty ||
            iid === IID.IDisposable)
            return true;

        return false;
    };

    //////////////////////////////////
    // IDisposable implementation

    accessor.dispose = () => {
        if (accessor.sub) {
            accessor.sub.dispose();
            accessor.sub = null;
        }
    };

    //////////////////////////////////
    // IObservableProperty<T> implementation

    accessor.value = initialValue;

    // setup observables
    accessor.changedSubject = new Rx.Subject<any>();
    accessor.changed = accessor.changedSubject
        .publish()
        .refCount();

    accessor.changingSubject = new Rx.Subject<any>();
    accessor.changing = accessor.changingSubject
        .publish()
        .refCount();

    accessor.source = this;
    accessor.thrownExceptions = internal.createScheduledSubject<Error>(scheduler, app.defaultExceptionHandler);

    //////////////////////////////////
    // implementation

    var firedInitial = false;

    accessor.sub = this
        .distinctUntilChanged()
        .subscribe(x => {
            // Suppress a non-change between initialValue and the first value
            // from a Subscribe
            if (firedInitial && x === accessor.value) {
                return;
            }

            firedInitial = true;

            accessor.changingSubject.onNext(x);
            accessor.value = x;
            accessor.changedSubject.onNext(x);
        }, x=> accessor.thrownExceptions.onNext(x));

    return accessor;
}
Run Code Online (Sandbox Code Playgroud)

RxExtension.d.ts

declare module Rx {
    export interface Observable<T> extends IObservable<T> {
        toProperty(initialValue?: T): wx.IObservableProperty<T>;
    }
}
Run Code Online (Sandbox Code Playgroud)