RxJS:在NodeJS中处理groupBy和Observable.fromEvents

sfr*_*hse 2 reactive-programming node.js rxjs

我对RxJS印象非常深刻,并且真正开始研究它.但是,我的以下nodejs代码至少对我来说不能按预期工作.

let events = new EventEmitter();
let source = Rx.Observable.fromEvent( events, 'data' );

source
    .groupBy( event => event.type )
    .flatMap( group => group.reduce( ( acc, cur ) => _.merge( acc, cur ), [] ) )
    .subscribe( ( data ) => {
        console.log( data );
    } );


events.emit( 'data', { 'type': 1, msg: 'Test 1' } );
events.emit( 'data', { 'type': 1, msg: 'Test 2' } );
events.emit( 'data', { 'type': 2, msg: 'Test 3' } );
Run Code Online (Sandbox Code Playgroud)

我希望subscribe产生一些输出

Kwi*_*enP 5

这是我在JSBin http://jsbin.com/cufana/edit?js,console中创建的稍微改变的版本.

我在你的代码中看到的问题是你有一个无法完成的observable.如果您正在执行groupBy,则只要源observable未完成,就不会推送内部保留的结果(即分组结果).

let events = new Rx.Subject();

events
    .groupBy( event => event.type)
    .flatMap(group => group.reduce((acc, curr) => [...acc, curr], []))
    .subscribe( ( data ) => {
        console.log( data );
    } );


events.next({ 'type': 1, msg: 'Test 1' } );
events.next({ 'type': 1, msg: 'Test 2' } );
events.next({ 'type': 2, msg: 'Test 3' } );
events.complete();
Run Code Online (Sandbox Code Playgroud)

在这里你可以看到我已经将eventemitter更改为一个主题,以获得一个没有angular2依赖的工作jsbin.我正在完成这个主题,所以我的来源可以从小组中观察完成.这将推动结果通过.

其余代码非常正确.

如果EventEmitter确实是来自Angular2,那么我猜完你就会遇到问题.这可以从子组件完成吗?