如何从Observable.from中收集发射值的数组?

tom*_*234 7 asynchronous reactive-programming observable rxjs angular

所以在Rxjs中,我有很多代码,

return Observable.from(input_array)
           .concatMap((item)=>{
               //this part emits an Observable.of<string> for each item in the input_array
           })
           .scan((output_array:string[],each_item_output_array:string)=>{
               return output_array.push(each_item_output_array) ;
           });
Run Code Online (Sandbox Code Playgroud)

但显然这是错误的,扫描将破坏concatMap中的代码,所以我想知道如何收集可观察from运算符中每个项的输出数组?

car*_*ant 33

在您的呼叫中,scan您没有为累加器指定种子.在那种情况下,第一个值用作种子.例如:

Rx.Observable
  .from(["a", "b", "c"])
  .scan((acc, value) => acc + value)
  .subscribe(value => console.log(value));
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

在您的代码段中,第一个值不是数组,因此您无法调用push它.要将值累积到数组中,可以指定如下的数组种子:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .scan((acc, value) => {
    acc.push(value);
    return acc;
  }, []) // Note that an empty array is use as the seed
  .subscribe(value => console.log(JSON.stringify(value)));
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

虽然,对于某些用例,最好不要改变数组:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .scan((acc, value) => [...acc, value], [])
  .subscribe(value => console.log(JSON.stringify(value)));
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

请注意,scan它为它接收的每个值发出一个数组.如果您只想在observable完成时发出一个数组,则可以使用toArray运算符:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .toArray()
  .subscribe(value => console.log(JSON.stringify(value)));
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

  • 如果我可以投票一百次,我会。toArray 解决了我一整天都在研究的问题。谢谢! (3认同)

Ole*_*nko 5

小心这段代码:

      const obs = Rx.Observable
      .from(["a", "b", "c"])
      .concatMap(value => Rx.Observable.of(value))
      .scan((acc, value) => {
        acc.push(value);
        return acc;
      }, []); 
      obs.subscribe(value => console.log(JSON.stringify(value)));
      obs.subscribe(value => console.log(JSON.stringify(value)));
Run Code Online (Sandbox Code Playgroud)

结果会有点出乎意料:

 ["a"]
 ["a","b"]
 ["a","b","c"]
 ["a","b","c","a"]
 ["a","b","c","a","b"]
 ["a","b","c","a","b","c"]
Run Code Online (Sandbox Code Playgroud)

“acc”变量是引用对象,每个订阅者获取流数据并将数据再次添加到同一对象。可能有很多解决方案可以避免它,这是在再次接收流数据时创建新对象:

    var obs = Rx.Observable
          .from(["a", "b", "c"])
          .concatMap(value => Rx.Observable.of(value))
          .scan((acc, value) => {
          //clone initial value
            if (acc.length == 0) {

                   acc = [];
                }
            acc.push(value);
            return acc 
          }, []); // Note that an empty array is use as the seed
          obs.subscribe(value => console.log(JSON.stringify(value)));
          obs.subscribe(value => console.log(JSON.stringify(value)));
Run Code Online (Sandbox Code Playgroud)

结果符合预期:

 ["a"]
 ["a","b"]
 ["a","b","c"]
 ["a"]
 ["a","b"]
 ["a","b","c"]
Run Code Online (Sandbox Code Playgroud)

我希望它能为某人节省很多时间