RxJava的后备可观察

Ale*_* Fu 13 rx-java

我正在寻找一种更好的方法来在使用RxJava时为空结果实现一个简单的Observable回退系统.我们的想法是,如果对一组数据的本地查询导致零项,则应该进行后备查询(可以是网络调用,或其他).目前,我的代码包含以下内容:

Observable.create(new Observable.OnSubscribe<Object>() {
  @Override
  public void call(Subscriber<? super Object> subscriber) {
     List<Object> results = queryLocalDatabase();
     if (results.isEmpty()) {
       subscriber.onError(new Throwable("Empty results"));
     } else {
       // Process results...
     }
  }
}).onErrorResumeNext(/* Fallback Observable goes here */);
Run Code Online (Sandbox Code Playgroud)

虽然这有效,但为空结果集抛出异常并没有多大意义.我注意到有条件运算符可用isEmpty,但是它似乎并不像我想要的那样.例如,使用isEmpty......

localObservable = Observable.create(new Observable.OnSubscribe<Object>() {
  @Override
  public void call(Subscriber<? super Object> subscriber) {
     List<Object> results = queryLocalDatabase();
     for (Object obj : results) {
       // Process object
       subscriber.onNext(object);           
     }
     subscriber.onCompleted();
  }
});

localObservable.isEmpty()
  .switchMap(new Func1<Boolean, Observable<? extends Object>>() {
    @Override
    public Observable<? extends Object> call(Boolean isEmpty) {
      if (isEmpty) {
        // Return fallback Observable.
        return fallbackObservable;
      }

      // Return original Observable, which would mean running the local query again... Not desired.
      return localObservable;
    }
  });
Run Code Online (Sandbox Code Playgroud)

这几乎让我到达了我想去的地方,除了localObservable在有非空项目的情况下看似会被运行两次,这是一个交易破坏者.

Arc*_*cao 13

使用switchIfEmpty运算符.

有用法的例子:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        // return no item
        //subscriber.onNext(...);
        System.out.println("Generating nothing :)");
        subscriber.onCompleted();
    }
}).switchIfEmpty(Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        System.out.println("Generating item");
        subscriber.onNext("item");
        subscriber.onCompleted();
    }
})).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(String s) {
        System.out.println("onNext: " + s);
    }
});
Run Code Online (Sandbox Code Playgroud)

用lamdas简化:

Observable.create(subscriber -> {
    // return no item
    //subscriber.onNext(...);
    System.out.println("Generating nothing :)");
    subscriber.onCompleted();
}).switchIfEmpty(Observable.create(subscriber -> {
    System.out.println("Generating item");
    subscriber.onNext("item");
    subscriber.onCompleted();
})).subscribe(
    s -> System.out.println("onNext: " + s),
    throwable -> System.out.println("onError"),
    () -> System.out.println("onCompleted")
);
Run Code Online (Sandbox Code Playgroud)