RxJS - 如何使用带有异步可观察数组的toArray()?

Ada*_*dam 3 reactive-programming rxjs

我正在创建一个异步可观察数组,Rx.Observable.create()并希望.toArray()在完成时使用它来获取所有值.

console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
  return Rx.Observable.create((obs)=>{
    let tid = setTimeout(()=>{
      console.log(val + ' timing out');
      obs.onNext(val);
    },i*500);
    return ()=>{
      clearTimeout(tid);
    };
  }).publish().refCount();
});

Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
  console.log("arr should be ['a','b','c']",arr);
});
Run Code Online (Sandbox Code Playgroud)

以上示例位于http://jsbin.com/wegoha/10/edit?js,console.

使用setTimeout作为一个独立于其他异步操作,以保持例子简单.

Eni*_*ity 7

代码是正确的,除非您没有完成源可观察量.

toArray()可观察完成时,操作者只能工作,因为你没有完成Rx.Observable.create然后将查询可能永远不会结束.

试试这个:

console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
  return Rx.Observable.create((obs)=>{
    let tid = setTimeout(()=>{
      console.log(val + ' timing out');
      obs.onNext(val);
      obs.onCompleted();
    },i*500);
    return ()=>{
      clearTimeout(tid);
    };
  }).publish().refCount();
});

Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
  console.log("arr should be ['a','b','c']",arr);
});
Run Code Online (Sandbox Code Playgroud)

另外,就像旁注一样,.publish().refCount()这里似乎错了.此代码中不需要使源可观察源变热.