RxJava:当其中一个流不发出任何信号时如何处理combineLatest()

Cas*_*ngs 6 java android rx-java rx-android rx-java2

我使用 mergeLatest() 来组合 3 个可观察数据流。所有这些组合在一起,以便同时显示 UI 中的所有数据。现在,有一种情况,其中一个可观察量不会发出任何内容,因为获取的数据可能为空。

是否有 RxJava 运算符让订阅者知道不会因为空数据而发出任何信号?

编辑

private fun retrieveData() {
    Observable.combineLatest(getCurrentUser.execute(), getLatestGoal.execute(), getLatestLog.execute(),
            Function3<User, Goal, Log, PersonalViewModel> { user, goal, log -> mapToViewModel(user, goal, log) })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe { /*todo: animation*/ }
            .doOnNext { view.setViewModel(it) }
            .doOnComplete { view.stopLoading() }
            .doOnError { /*todo: error message*/ }
            .subscribe()
}
Run Code Online (Sandbox Code Playgroud)

第三个流:当用户有 nog 日志时,getLatestLog.execute() 不发出任何内容。当该流不发出时,整个视图将不可见。

数据是从 FireBase 实时数据库中获取的。ChildEventListener 有一个如下所示的方法:

override fun onChildAdded(dataSnapshot: DataSnapshot?, p1: String?) {
                val log = dataSnapshot?.getValue(Log::class.java)
                log?.let { subscriber.onNext(it) }
                subscriber.onComplete()
                firebaseDatabase.reference.removeEventListener(this)
            }
Run Code Online (Sandbox Code Playgroud)

Han*_*rst 1

如果您手边有 Java8 或一些可选选项,您可以使用以下构造:

  @Test
  void name() {
    TestScheduler scheduler = new TestScheduler();

    Observable<Optional<Integer>> o1$ =
        Observable.just(Optional.ofNullable(4)).mergeWith(Observable.never());
    Observable<Optional<Integer>> o2$ =
        Observable.just(Optional.ofNullable(2)).mergeWith(Observable.never());

    Observable<Optional<Integer>> o3$ =
        Observable.<Optional<Integer>>never()
            .timeout(1000, TimeUnit.MILLISECONDS, scheduler)
            .onErrorResumeNext(
                throwable -> {
                  return Observable.<Optional<Integer>>never()
                      .mergeWith(Observable.just(Optional.empty()));
                });

    Observable<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> result =
        Observable.combineLatest(
                o1$,
                o2$,
                o3$,
                (integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
            .filter(t -> t._1.isPresent() && t._2.isPresent() && t._3.isPresent());

    TestObserver<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> test =
        result.test();

    scheduler.advanceTimeTo(10000, TimeUnit.SECONDS);

    test.assertNotComplete().assertNoErrors().assertNoValues();
  }
Run Code Online (Sandbox Code Playgroud)

正如您可能不会的那样,不允许通过 observables-pipelines 发出 null 值。因此我们需要一些其他的构造来表示 null。在Java8中,有一个称为Optional的构造(vavr也将其称为Option -> Java8)。

在这个例子中,o3$-Observable 不会发出任何东西。它也可能会出错,也许这更类似于你的情况。我们将捕获错误(在本例中:超时异常)并返回一个带有Optional.empty的Observable。

在组合回调中,我们组合了所有三个值。在后面的步骤中,我们过滤掉所有具有有效值的元组(带有值的可选)。

当所有三个值都已发出一个值时,您只会得到一个发出的值。

当您不能使用Optional-class时,您还可以定义一个INVALID-Object,如下例所示:

class So51217041 {
  private static Integer INVALID_VALUE = 42;

  @Test
  void name() {
    Observable<Integer> o1$ = Observable.just(4).mergeWith(Observable.never());
    Observable<Integer> o2$ = Observable.just(2).mergeWith(Observable.never());

    Observable<Integer> o3$ =
        Observable.<Integer>never()
            .onErrorResumeNext(
                throwable -> {
                  return Observable.<Integer>never().mergeWith(Observable.just(INVALID_VALUE));
                });

    Observable<Tuple3<Integer, Integer, Integer>> result =
        Observable.combineLatest(
                o1$,
                o2$,
                o3$,
                (integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
            .filter(t -> t._3 != INVALID_VALUE); // yeah I know, I want to compare reference, not the content

    TestObserver<Tuple3<Integer, Integer, Integer>> test = result.test();

    test.assertNotComplete().assertNoErrors().assertNoValues();
  }
}
Run Code Online (Sandbox Code Playgroud)

另外,当您希望流以 INVALID 或 NULL 开头时,CombineLatest 至少发出一个值,您可以使用 Observable#startWith(INVALID) 或 Observable#startWith(Optional.empty())。这将保证可观察对象至少会发出一个值。