使用RxJava连锁两个改造的可观测量

loc*_*ost 26 java android reactive-programming rx-java retrofit

我想一个接一个地执行2个网络呼叫.两个网络调用都返回Observable.第二呼叫从所述第一呼叫的成功的结果使用数据,在第二个呼叫的成功的结果从方法使用数据两者第二呼叫的所述第一的成功的结果和.此外,我应该能够处理这两个 onError的"事件"是不同的.我怎样才能避免回调地狱,如下例所示:

       API().auth(email, password)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<AuthResponse>() {
                @Override
                public void call(final AuthResponse authResponse) {
                    API().getUser(authResponse.getAccessToken())
                            .subscribe(new Action1<List<User>>() {
                                @Override
                                public void call(List<User> users) {
                                    doSomething(authResponse, users);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {
                                    onErrorGetUser();
                                }
                            });
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    onErrorAuth();
                }
            });
Run Code Online (Sandbox Code Playgroud)

我知道zip,但我想避免创建"Combiner类".

更新1. 试图实现akarnokd的答案:

         API()
            .auth(email, password)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .flatMap(authResponse -> API()
                    .getUser(authResponse.getAccessToken())
                    .doOnError(throwable -> {
                        getView().setError(processFail(throwable));
                    }), ((authResponse, users) -> {
                // Ensure returned user is the which was authenticated
                if (authResponse.getUserId().equals(users.get(0).getId())) {
                    SessionManager.getInstance().initSession(email, password, authResponse.getAccessToken(), users.get(0));
                    getView().toNews();
                } else {
                    getView().setError(R.string.something_went_wrong);
                }
            }));
Run Code Online (Sandbox Code Playgroud)

但是里面flatMap的方法编译器说,它无法解析authResponse和用户(的方法authResponse.getAccessToken(),users.get(0)等等).我是rx编程和lambdas的新手 - 请告诉我这是什么问题.无论如何代码现在看起来更干净.

更新2.

API()
            .auth(email, password)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(throwable -> getView().setError(processFail(throwable)))
            .flatMap((AuthResponse authResponse) -> API()
                    .getUser(authResponse.getAccessToken())
                    .doOnError(throwable -> getView().setError(processFail(throwable))), ((AuthResponse authResponse, List<User> users) -> {
                            // Ensure returned user is the which was authenticated
                            if (authResponse.getUserId().equals(users.get(0).getId())) {
                                SessionManager.getInstance().initSession(email, password, authResponse.getAccessToken(), users.get(0));
                                getView().toNews();
                            }
                            return Observable.just(this);
            }));
Run Code Online (Sandbox Code Playgroud)

这样做了,但现在我的网络电话根本没有执行.

Ant*_* R. 15

你看过flatMap()吗?如果你厌恶它(或zip())是需要创建一个不必要的类只是为了保存两个对象,android.util.Pair可能是一个答案.不过,我不确定如何准确地获得您正在寻找的错误处理.

       API().auth(email, password)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .flatMap(new Func1<AuthResponse, Observable<List<User>>>() {
          @Override
          public Observable<List<User>> call(AuthResponse authResponse) {
            return API().getUser(authResponse.getAccessToken());
          }
        }, new Func2<AuthResponse, List<User>, Pair<AuthResponse, List<User>>>() {
          @Override
          public Pair<AuthResponse, List<User>> call(AuthResponse authResponse, List<User> users) {
            return new Pair<>(authResponse, users);
          }
        }).subscribe(new Action1<Pair<AuthResponse, List<User>>>() {
          @Override
          public void call(Pair<AuthResponse, List<User>> pair) {
            doSomething(pair.first, pair.second);
          }
        }, new Action1<Throwable>() {
          @Override
          public void call(Throwable throwable) {
            // not sure how to tell which one threw the error
          }
        });
Run Code Online (Sandbox Code Playgroud)


aka*_*okd 10

除了Anthony R.的答案之外,还有一个flatMap重载,它接受一个Func2并为你配对你的主要值和扁平值.另外,查看onErrorXXX和onExceptionXXX运算符以进行错误操作,并将它们与第一个和第二个Observables链接起来

first.onErrorReturn(1)
.flatMap(v -> service(v).onErrorReturn(2), (a, b) -> a + b);
Run Code Online (Sandbox Code Playgroud)