RxJava-检查条件并仅在条件为true时重复一次

And*_*nik 4 java android reactive-programming rx-java

我曾经RxJava + Retrofit在Android应用中进行API调用。在某些情况下,用户发出请求并且其令牌已过期。在这种情况下,我会在中收到正常响应onNext,但响应中不包含结果,而是包含一些代码的error元素。如果发生这种情况,我需要重新登录用户,并且仅在获得新令牌后才重复原始请求。所以我想用来组织这个RxJava。为了使事情变得容易,我将举一个简单的例子。假设我有以下方法:

public void test(int someInt){
    Observable.just(someInt)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    log("onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    log("onError");
                }

                @Override
                public void onNext(Integer integer) {
                    log("onNext - " + integer);
                }
            });
Run Code Online (Sandbox Code Playgroud)

我想检查if (someInt == 0) 之前 onNext()被称为。如果我得到,false我想继续并被onNext()调用,但是如果我得到,true我想执行一些操作并仅重复一次原始可观察的,如果条件第二次返回false,我就不想再重复一次。

有人可以帮我弄清楚我对此有什么选择吗?PS我是RX世界的新手。

Dio*_*lor 6

干得好。因为您想重试整个链,所以.retryWhen非常适合它,因此您必须“玩弄”一些错误。

在下面,如果您检测到无效的令牌,则会传递一个错误(仅在第一次),retryWhen将捕获该错误并重新订阅整个rx链(从开始Observable.just(someInt))。

 haveRetriedOnce = false;

 Observable.just(someInt)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .flatMap(integer ->{
          if(integer == 0){
            if(haveRetriedOnce){
               return Observable.error(new UserOperationException());
            }
            // problem, throw an error and the .retryWhen will catch it
            return Observable.error(new InvalidTokenException());
          }else{
            return Observable.just(integer);
          }
        })
        .retryWhen(observable -> observable.flatMap(throwable->{
          if(throwable instanceOf InvalidTokenException){
            haveRetriedOnce = true;
            return just(0); // retry, the int here is irrelevant
          }else{
            // other error, pass it further
            return Observable.error(throwable);
          }
        }))
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                log("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                log("onError");
            }

            @Override
            public void onNext(Integer integer) {
                log("onNext - " + integer);
            }
        }
Run Code Online (Sandbox Code Playgroud)