使用回调/侦听器链接RxJava observable

hiB*_*Lee 9 android reactive-programming observable rx-java rx-android

我正在使用Obtrables的Retrofit,并希望链接可观察量.通常它适用于像map()或之类的函数flatMap(),因为api返回执行任务的Observable.但在这种情况下,我必须做以下事情:

  1. 来自的getKey() api
  2. 使用另一个库中的返回键Foo并等待调用回调.
  3. 当回调返回时,将结果发送给api.

我希望这是一个单一的链接调用,所以我只需要订阅一次.我猜我可以使用merge()或者join()什么,但不确定处理回调的最佳方法是什么.

有没有办法让这更好?这是我到目前为止:

api.getKey().subscribe(new Action1<String>() {
   @Override
   public void call(String key) {
      Foo foo = new Foo();
      foo.setAwesomeCallback(new AwesomeCallback() {
         @Override
         public void onAwesomeReady(String awesome) {
            api.sendAwesome(awesome)
                    .subscribe(new Action1<Void>() {
                       @Override
                       public void call(Void aVoid) {
                           handleAwesomeSent();
                       }
                    });
         }
      });
      foo.makeAwesome();
   }
});
Run Code Online (Sandbox Code Playgroud)

dav*_*ola 17

适应clemp6r的解决方案,这是另一个既不需要Subjects也不需要嵌套的解决方案Subscriptions:

api.getKey().flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {

        return Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(final Subscriber<? super String> subscriber) {
                Foo foo = new Foo();
                foo.setAwesomeCallback(new AwesomeCallback() {
                    @Override
                    public void onAwesomeReady(String awesome) {
                        if (! subscriber.isUnsubscribed()) {
                            subscriber.onNext(awesome);
                            subscriber.onComplete();
                        }
                    }
                });
                foo.makeAwesome();
            } 
        });
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String awesome) {
        return sendAwesome(awesome);
   }
}).subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        handleAwesomeSent();
    }
});
Run Code Online (Sandbox Code Playgroud)

总的来说,我认为它总是可能要包装一个在任何基于回调的异步操作Observable使用Observable.create().