在使用RxJava2和Retrofit在android中实现即时搜索时获取java.io.InterruptedIOException

Abh*_*ari 1 java android retrofit2 rx-java2

因此,我尝试使用rxjava2和改造来实现即时搜索,该过程很简单,只要用户更改了文本publish.onNext()(发布是一个PublishSubject对象)就可以了。我添加了filter和debounce以及switch映射运算符,以在文本的长度大于阈值并且不会同时使用连续输入进行调用时方便从服务器进行搜索。

这是代码:

subject = PublishSubject.create();
    getCompositeDisposable().add(subject
            .filter(s -> s.length() >= 3)
            .debounce(300,
                    TimeUnit.MILLISECONDS)
            .switchMap(s -> getDataManager().getHosts(
                    getDataManager().getDeviceToken(),
                    s).observeOn(Schedulers.io()))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(hostResponses -> {
                getMvpView().hideEditLoading();
                if (hostResponses.size() != 0) {
                    if (this.hostResponses != null)
                        this.hostResponses.clear();
                    this.hostResponses = hostResponses;
                    getMvpView().setHostView(getHosts(hostResponses));
                } else {
                    getMvpView().onFieldError("No host found");
                }

            }, throwable -> {
                getMvpView().hideEditLoading();
                if (throwable instanceof HttpException) {
                   HttpException exception = (HttpException)throwable;
                    if (exception.code() == 401) {
                        getMvpView().onError(R.string.code_expired,
                                BaseUtils.TOKEN_EXPIRY_TAG);
                    }
                }

            })
    );
Run Code Online (Sandbox Code Playgroud)

现在我的代码可以正常工作,可以满足我的需要,但是当我输入一个长字符串并按退格按钮时,我遇到了一个错误,那就是当清除AutoCompleteTextView的文本时,会引发异常

这是异常的堆栈跟踪:

java.io.InterruptedIOException: thread interrupted  
at okio.Timeout.throwIfReached(Timeout.java:145)  
at okio.Okio$1.write(Okio.java:76)  
at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)  
at okio.RealBufferedSink.flush(RealBufferedSink.java:216)  
at okhttp3.internal.http1.Http1Codec.finishRequest(Http1Codec.java:166)  
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:84) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)  
at com.facebook.stetho.okhttp3.StethoInterceptor.intercept(StethoInterceptor.java:56)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)  
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)  
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:125)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)  
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)  
at okhttp3.RealCall.execute(RealCall.java:77)  
at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)  
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)  
at io.reactivex.Observable.subscribe(Observable.java:10700)  
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)  
at io.reactivex.Observable.subscribe(Observable.java:10700)  
at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45)  
at io.reactivex.Observable.subscribe(Observable.java:10700)  
at io.reactivex.internal.operators.observable.ObservableSwitchMap$SwitchMapObserver.onNext(ObservableSwitchMap.java:126)  
at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:111)  
at io.reactivex.internal.operators.observable.ObservableDebounceTimed$DebounceTimedObserver.emit(ObservableDebounceTimed.java:140)  
at io.reactivex.internal.operators.observable.ObservableDebounceTimed$DebounceEmitter.run(ObservableDebounceTimed.java:165)  
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)  
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)  
at java.util.concurrent.FutureTask.run(FutureTask.java:237)  
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)  
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)  
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)  
at java.lang.Thread.run(Thread.java:762
Run Code Online (Sandbox Code Playgroud)

aka*_*okd 6

observeOn(Schedulers.io())鉴于您在此之后立即将元素移回了主线程,因此内部看起来并不正确。它应该在subscribeOn(Schedulers.io())那里。

还要在subscribeOn()通话之前删除该通话,subscribe因为鉴于该链PublishSubject在顶部订阅了a ,它应该没有实际作用。

.switchMap(s -> getDataManager()
               .getHosts(getDataManager().getDeviceToken(), s)
//             .observeOn(Schedulers.io())
               .subscribeOn(Schedulers.io())   // <-------------------------
)
.observeOn(AndroidSchedulers.mainThread())
//.subscribeOn(Schedulers.io())   // <--------------------------------------
.subscribe(hostResponses -> {
Run Code Online (Sandbox Code Playgroud)