使用RxJava和Okhttp

San*_*ack 22 java android rx-java okhttp

我想在另一个线程(如IO线程)中使用okhttp请求一个url并进入ResponseAndroid主线程,但我不知道如何创建一个Observable.

Sae*_*umi 25

首先添加RxAndroid到您的依赖项,然后Observable像这样创建:

 Subscription subscription =   Observable.create(new Observable.OnSubscribe<Response>() {
        OkHttpClient client = new OkHttpClient();
          @Override
          public void call(Subscriber<? super Response> subscriber) {
            try {
              Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
              if (response.isSuccessful()) {
                  if(!subscriber.isUnsubscribed()){
                     subscriber.onNext(response);
                  }
                  subscriber.onCompleted();
              } else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
                  subscriber.onError(new Exception("error"));
                }
            } catch (IOException e) {
              if (!subscriber.isUnsubscribed()) {
                  subscriber.onError(e);
              }
            }
          }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Response>() {
              @Override
              public void onCompleted() {

              }

              @Override
              public void onError(Throwable e) {

              }

              @Override
              public void onNext(Response response) {

              }
            });
Run Code Online (Sandbox Code Playgroud)

它会在另一个线程(io线程)中请求你的url并在android主线程上观察它.

最后当你离开屏幕使用时,subsribtion.unsubscribe()以避免内存泄漏.

当你使用时Observable.create,你应该写很多样板代码,你也必须自己处理订阅.更好的选择是使用延迟.形成文件:

在观察者订阅之前不要创建Observable,并为每个观察者创建一个新的Observable

Defer运算符等待观察者订阅它,然后它生成一个Observable,通常带有Observable工厂函数.它为每个订户重新执行此操作,因此尽管每个订户可能认为它订阅了相同的Observable,但实际上每个订户都获得其自己的单独序列.

正如MarcinKoziński所提到的,你只需要这样做:

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

  • 现在我注意到它比这更糟糕.它还打破了[Observable contract](http://reactivex.io/documentation/contract.html).如果请求不成功,它将发出`onNext`,`onComplete`,然后才会发出`onError`.你应该只发出`onComplete`或`onError`,但不要两者都发出.这可以通过多种方式轻松修复.但这是一个完美的例子,说明为什么难以编写自己的`create()`以及为什么避免它更安全. (3认同)

Mar*_*ski 18

使用它更容易,更安全,Observable.defer()而不是Observable.create():

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

这样就可以为您处理取消订阅和背压.这是Dan Lew关于create()和的一篇伟大的文章defer().

如果你希望走的Observable.create()路线,那么它应该看起来更像是在该库isUnsubscribed()到处洒电话.我相信这仍然无法处理背压.


FRR*_*FRR 10

我意识到这篇文章有点陈旧,但现在有一种新的,更方便的方法

Observable.fromCallable {
        client.newCall(Request.Builder().url("your url").build()).execute()
    }
Run Code Online (Sandbox Code Playgroud)

更多信息:https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/

  • 这不是Java,而是Kotlin (4认同)