具有RxJava支持的Chaining Retrofit服务

Bra*_*ell 7 android netflix square rx-java retrofit

我在使用改造的RxJava支持来链接observable时遇到了麻烦.我可能误解了如何使用它,否则它可能是改造中的一个错误.希望有人在这里可以帮助我了解正在发生的事情.编辑:我正在使用MockRestAdapter进行这些响应 - 这可能是相关的,因为我看到RxSupport实现略有不同.

这是一个虚假的银行应用程序.它正在尝试进行转移,并且在转移完成后,它应该执行帐户请求以更新帐户值.这基本上只是我试用flatMap的借口.遗憾的是,以下代码无效,订阅者无法收到通知:

案例1:链接两个改造生产的可观测量

转移服务(注意:返回改造生产的可观察物):

@FormUrlEncoded @POST("/user/transactions/")
public Observable<TransferResponse> transfer(@Field("session_id") String sessionId,
                                             @Field("from_account_number") String fromAccountNumber,
                                             @Field("to_account_number") String toAccountNumber,
                                             @Field("amount") String amount);
Run Code Online (Sandbox Code Playgroud)

帐户服务(注意:返回改进生成的observable):

@FormUrlEncoded @POST("/user/accounts")
public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId);
Run Code Online (Sandbox Code Playgroud)

将两个改造后的观测链连在一起:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return accountsService.getAccounts(session.getSessionId());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud)

案例2:创建我自己的可观察和链接与改造生产的

如果我忽略了"平面映射"调用中Retrofit内置的Rx支持,它可以完美运行!所有订户都会收到通知 见下文:

新帐户服务(注意:不会产生可观察的):

@FormUrlEncoded @POST("/user/accounts")
public List<Account> getAccountsBlocking(@Field("session_id") String sessionId);
Run Code Online (Sandbox Code Playgroud)

创建我自己的observable并自己发出项目:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return Observable.create(new Observable.OnSubscribe<List<Account>>() {
                        @Override public void call(Subscriber<? super List<Account>> subscriber) {
                            List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId());
                            subscriber.onNext(accounts);
                            subscriber.onCompleted();
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud)

任何帮助将不胜感激!

Mig*_*gne 5

答案是肯定的,你应该能够从Retrofit链接observables.MockRestAdapter $ MockRxSupport:createMockObservable私有类中似乎存在一个错误.关于将订阅者订阅到可观察者的调度方式似乎是错误的.在HttpExecutor线程本身启动后,订阅了observable.我相信来自您的Schedulers.io()线程的原始流程已完成并取消订阅,之后可以订阅mockHandler.invokeSync返回的Observable.如果你看一下retrofit-mock模块中的代码,希望这种解释有一些意义.

作为现在使用retrofit-mock的当前代码的解决方法,您可以使用自己的ImmediateExecutor实现替换内部默认Executor.这至少在测试模拟时有一个由Scheduler.io提供的单个线程流.

// ImmediateExecutor.java
public class ImmediateExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

// Create your RestAdapter with your ImmdiateExecutor
RestAdapter adapter = new RestAdapter.Builder()
            .setEndpoint(endpoint)
            .setExecutors(new ImmediateExecutor(), null)
            .build();
Run Code Online (Sandbox Code Playgroud)

至于在源头修复问题,您还可以在项目中包含retrofit-mock项目作为源代码,并使用下面的代码修改MockRestAdapter $ MockRxSupport:createMockObservable方法.我已经测试了你的用例,它确实解决了这个问题.

--- MockRestAdapter.java $ MockRxSupport ----

Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo,
        final RequestInterceptor interceptor, final Object[] args) {
      return Observable.create(new Observable.OnSubscribe<Object>() {
        @Override public void call(final Subscriber<? super Object> subscriber) {
          try {
            if (subscriber.isUnsubscribed()) return;
            Observable observable =
                (Observable) mockHandler.invokeSync(methodInfo, interceptor, args);

            observable.subscribeOn(Schedulers.from(httpExecutor));

            //noinspection unchecked
            observable.subscribe(subscriber);

          } catch (RetrofitError e) {
            subscriber.onError(errorHandler.handleError(e));
          } catch (Throwable e) {
            subscriber.onError(e);
          }
        }
      });
    }
Run Code Online (Sandbox Code Playgroud)

在这里创建了一个Retrofit项目的问题,我们将看看他们是否接受了它.