将Observable收集到List中似乎不会立即发出集合

Kau*_*pal 7 java frp rx-java

我正在使用RxJava来实质上收集单独发出的Observable列表,并将它们组合成一个Observable列表(基本上与flatMap相反).这是我的代码:

        // myEvent.findMemberships() returns an Observable<List<Membership>>

        myEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .toList()
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<List<User>>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(List<User> users) {
                 Timber.d("%d users emitted", users.size());
               }
             })
Run Code Online (Sandbox Code Playgroud)

我注意到我的onNext永远不会被调用.我似乎无法理解这一点.如果我删除.toList调用并基本输出各个用户(如下所示),它通过发出每个项目来工作.

subscriptions //
    .add(currentEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<User>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(User user) {
                 Timber.d("%d users emitted", user.getName());
               }
             }));
Run Code Online (Sandbox Code Playgroud)

Q1.我对.toList的理解不正确吗?

Q2.如何将单独发射的Observable<Object>s 流组合成一个Observable<List<Object>>

**编辑

@kjones完全解决了这个问题.我没有用findMemberships电话调用onComplete.我在下面添加了代码段.我的真实用例有一些更多的转换,这就是为什么我需要使用.toList调用.正如@zsxwing也正确地指出的那样,对于这个用例,一个简单的地图就足够了.

public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
  @Override
  public void call(Subscriber<? super List<Membership>> subscriber) {
    try {
      // .....
      List<Membership> memberships = queryMyDb();

      subscriber.onNext(memberships);

      // BELOW STATEMENT FIXES THE PROBLEM ><
      // subscriber.onCompleted();

    } catch (SQLException e) {
      // ...
    }
  }
});
Run Code Online (Sandbox Code Playgroud)

}

kjo*_*nes 20

看起来myEvent.findMemberships()调用返回的Observable 永远不会调用onComplete.你能展示这段代码吗?

如果是这种情况,它会解释您所看到的行为.在.toList()发出所有项目之前,不会发出列表(由onComplete发出信号).

您的第二个版本.toList()将继续如下:

.findMemberships()
    emits a single List<Membership>
.flatMap()
    transforms List<Membership> into a single List<User>
    Observable.from(users) creates an observable that emits each user
.subscribe()
    onNext() is called for each user
    onCompleted() is never called.
Run Code Online (Sandbox Code Playgroud)

你的原始版本:

.findMemberships()
    emits a single List<Membership>
.flatMap()
    transforms List<Membership> into a single List<User>
    Observable.from(users) creates an observable that emits each user
.toList()
    buffers each User waiting for onCompleted() to be called
    onCompleted is never called because the .findMemberships Observable never completes
Run Code Online (Sandbox Code Playgroud)

有几种解决方案:

1)使用findMemberShips()ObCoable调用onComplete.如果返回的Observable findMemberShips()是一个Rx主题(PublishSubject,BehaviorSubject等),这可能是不可取的.

2)使用Observable.just()而不是Observable.from().你已经有一个List<User>.flatMap(),只需返回它.使用Observable.from(users)创建一个发出每个用户的Observable.Observable.just(users)会创建一个发出单一的Observable List<User>.没必要.toList().

3)使用.map()而不是.flatMap().再一次,没有必要.toList().因为每个List<Membership>变换成List<User>你只需要使用.map().

myEvent
    .findMemberships()
    .map(new Func1<List<Membership>, List<User>>() {
        @Override
        public List<User> call(List<Membership> memberships) {
            List<User> users = new ArrayList<User>();
            for (Membership membership : memberships) {
                users.add(membership.getUser());
            }
            return users;
         }
    })
Run Code Online (Sandbox Code Playgroud)