将RxJS Observable收集到数组中

apo*_*nur 5 reactive-extensions-js rxjs

我想使用RxJS与同步世界"桥接"事件的异步世界.具体来说,我想创建一个函数,该函数返回在某个时间间隔内收集的事件数组.

我可以创建Observable来做我想要的

var source = Rx.Observable
.interval(100 /* ms */)
.bufferWithTime(1000).take(1)
Run Code Online (Sandbox Code Playgroud)

我可以打印正确的值

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + JSON.stringify(x));
    },
    function () {
        console.log('Completed');   
    });
Run Code Online (Sandbox Code Playgroud)

这打印

[0,1,2,3,4,5,6,7,8] 
Completed 
Run Code Online (Sandbox Code Playgroud)

但我想要的是将此数组赋值给变量.从概念上讲,我想要的东西

var collectedDuringSecond = source.toPromise.getValue()

这个想法是getValue会阻塞所以在上面的行完成之后collectDuringSecond将包含[0,1,2,3,4,5,6,7,8]

cwh*_*ris 6

JavaScript中的同步事件编程具有很高的限制性.事实上,在很多情况下可能是不可能的.我尝试用Rx进行攻击,看看我是否可以在不修改Rx源的情况下提供同步接口,并且(有充分理由)使用直接JavaScript是不可能的.

我建议将Observable作为API的一部分公开,并允许消费者从那里处理它(当然要轻推使用Rx).

function MyClass () {

    this.getArrayOfStuffAsObservable = function () {
        return Rx.Observable.interval(100)
            .bufferWithTime(1000).take(1);
    };

    // this is optional and I don't recommend it, since you already have Rx available.
    // additionally, consumers will probably miss the fact that you can dispose
    // of the subscription.
    this.getArrayOfStuff = function (callback) {
        var value;
        return this.getArrayOfStuffAsObservable()
            .subscribe(
                function (x) {
                    value = x;
                },
                function (err) {
                    callback(err);
                },
                function () {
                    if (hasValue) {
                        callback(undefined, value);
                    } else {
                        callback('did not receive value');
                    }
                });

    };
};
Run Code Online (Sandbox Code Playgroud)

作为一个附加的注释,您可能要使用toArray结合take,而不是bufferWithTime为这个具体的例子(它实际上做同一件事的两个方面,但一个是基于时间和基于项目计数等).toArray创建一个Observable,它将收集底层observable的所有值,并在底层Observable完成时将这些值作为数组生成.

this.getArrayOfStuffAsObservable = function () {
    return Rx.Observable.interval(100)
        .take(10)
        .toArray();
};
Run Code Online (Sandbox Code Playgroud)