RxJs可观察数组的数组

Spr*_*bua 24 observable rxjs typescript angular

对于在TypeScript中使用Angular2编写的Web应用程序,我需要使用RxJs Observable.
由于我之前从未使用过rxjs,而且我一般都是反应编程的新手,所以我有时候在找到正确的方法来做一些特定的事情时会遇到一些困难.
我现在面临一个问题.在哪里我必须转换Array<Observable<T>>Observable<Array<T>>.
我将尝试用一个例子来解释它:
- 我有一个Observable,它给了我一个Users(Observable<Array<User>>)的列表
- User-class有一个函数getPosts返回一个Observable<Array<Post>>.
- 我需要将a映射Observable<Array<User>>到a Observable<Array<Post>>来评估函数Post内的所有s onNext.

我可以轻松地映射Observable<Array<User>>Observable<Array<Observable<Array<Post>>>>使用
map((result : Array<User>) => result.map((user : User) => user.getPosts()))
ans我可以将一个Array<Array<Post>>变成一个Array<Post>.
但是我只是找不到正确的方法来映射Observable<Array<Observable<Array<Post>>>>到一个Observable<Array<Array<Post>>>
直到现在我combineLatest一起使用该功能flatMap.
对我来说它似乎工作,我使用的编辑器(Atom编辑器)没有显示任何错误.但是现在我使用Netbeans,它在这段代码中显示错误.使用"tsc"编译代码也会导致错误.
看起来像这样:

Argument of type '(result: Post[]) => void' is not assignable to parameter of type 'NextObserver<[Observable<Post>]> | ErrorObserver<[Observable<Post>]> | CompletionObserver<[...'.
Type '(result: Post[]) => void' is not assignable to type '(value: [Observable<Post>]) => void'.  
Run Code Online (Sandbox Code Playgroud)

所以我的问题是:
我如何"扁平化"的ArrayObservablesArray

Thi*_*ier 33

flatMap运营商允许这样做.我不完全明白你尝试做什么,但我会尽力提供答案......

如果你想加载所有

getPostsPerUser() {
  return this.http.get('/users')
    .map(res => res.json())
    .flatMap((result : Array<User>) => {
      return Observable.forkJoin(
        result.map((user : User) => user.getPosts());
    });
}
Run Code Online (Sandbox Code Playgroud)

Observable.forkJoin 允许您等待所有可观察数据都已接收数据.

上面的代码假设user.getPosts()返回一个可观察的...

有了这个,您将收到一系列帖子:

this.getPostsPerUser().subscribe(result => {
  var postsUser1 = result[0];
  var postsUser2 = result[1];
  (...)
});
Run Code Online (Sandbox Code Playgroud)


小智 8

您可以在所需的rxjs方法上使用apply函数,如下所示:

const source1 = Rx.Observable.interval(100)
  .map(function (i) { return 'First: ' + i; });

const source2 = Rx.Observable.interval(150)
  .map(function (i) { return 'Second: ' + i; });

const observablesArray = [source1, source2];

const sources = Rx.Observable.combineLatest
.apply(this, observablesArray).take(4)

/* or you can write it without "apply" this way:
const sources = Rx.Observable
                .combineLatest(...observablesArray)
                .take(4)
*/
sources.subscribe(
  (response) => {
    console.log(response);
  }
)
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)


Pau*_*ard 5

这是使用 rxjs@6.3.3 的一些示例代码

import { of, forkJoin} from 'rxjs';
import { Observable } from 'rxjs'

const source:Array<Observable<String>> = [of('A'), of('B'), of('C') ];

forkJoin(source).subscribe( (x)=> console.log(x))
Run Code Online (Sandbox Code Playgroud)

https://arrayofobservablesexamp.stackblitz.io