如何只发送Rx和Retrofit的最后一次请求?

sha*_*agi 5 android rx-java retrofit rx-android

我有一个EditText视图和TextWatcher,在onTextChanged方法我必须从EditText字段查询服务器的结果.在我的演示者中,我使用rx,但我需要延迟搜索,直到用户的输入结束.这时我得到了这个:

service.getData(query)
            .delaySubscription(REQUEST_DELAY_FROM_SERVER, TimeUnit.MILLISECONDS, Schedulers.io())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    data-> {
                        getViewState().showData(data);
                    },
                    error -> {
                        Log.e(this.getClass().getSimpleName(), error.getMessage(), error);
                    }
            );
Run Code Online (Sandbox Code Playgroud)

但是delaySubscription不能按预期工作.它收集所有呼叫,延迟后发送每个呼叫.我必须做同样的事情,好像我曾经使用过handler.postDelayed(),只发送一次请求.

ULa*_*ins 7

编辑2:

RxJava2中的演示者的saple

class Presenter {
    private PublishSubject<String> queryPublishSubject = PublishSubject.create();

    public Presenter() {
        queryPublishSubject
                .debounce(1000, TimeUnit.MILLISECONDS)
                // You might want to skip empty strings
                .filter(new Predicate<CharSequence>() {
                    @Override
                    public boolean test(CharSequence charSequence) {
                        return charSequence.length() > 0;
                    }
                })
                // Switch to IO thread for network call and flatMap text input to API request
                .observeOn(Schedulers.io())
                .flatMap(new Function<CharSequence, Observable<...>() {
                    @Override
                    public Observable<...> apply(final CharSequence charSequence) {
                        return ...; // Call API
                    }
                })
                // Receive and process response on Main thread (if you need to update UI)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(...);
    }

    public void onSearchTextChanged(String query) {
        queryPublishSubject.onNext(query);
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑1:

RxJava 1中的代码相同:

class Presenter {
    private PublishSubject<String> queryPublishSubject = PublishSubject.crate();

    public Presenter() {
        queryPublishSubject
            .debounce(1000, TimeUnit.MILLISECONDS)
            // You might want to skip empty strings
            .filter(new Func1<CharSequence, Boolean>() {
                @Override
                public Boolean call(CharSequence charSequence) {
                    return charSequence.length() > 0;
                }
            })
            // Switch to IO thread for network call and flatMap text input to API request
            .observeOn(Schedulers.io())
            .flatMap(new Func1<CharSequence, Observable<...>() {
                @Override
                public Observable<...> call(final CharSequence charSequence) {
                    return ... // Call API
                }
            })
            // Receive and process response on Main thread (if you need to update UI)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(...);
    }

    public void onSearchTextChanged(String query) {
        queryPublishSubject.onNext(query);
    } 
}  
Run Code Online (Sandbox Code Playgroud)

初步答案(使用RxBinding和RxJava 1)

正确的答案是使用Debounce,但除此之外,还有一些其他技巧可能会有用

textChangeListener = RxTextView
    .textChanges(queryEditText)
    // as far as I know, subscription to textChanges is allowed from Main thread only
    .subscribeOn(AndroidSchedulers.mainThread()) 
    // On subscription Observable emits current text field value. You might not need that
    .skip(1) 
    .debounce(1000, TimeUnit.MILLISECONDS)
    // You might want to skip empty strings
    .filter(new Func1<CharSequence, Boolean>() {
        @Override
        public Boolean call(CharSequence charSequence) {
            return charSequence.length() > 0;
        }
    })
    // Switch to IO thread for network call and flatMap text input to API request
    .observeOn(Schedulers.io())
    .flatMap(new Func1<CharSequence, Observable<...>() {
        @Override
        public Observable<...> call(final CharSequence charSequence) {
            return ... // Call API
        }
    })
    // Receive and process response on Main thread (if you need to update UI)
    .observeOn(AndroidSchedulers.mainThread())
Run Code Online (Sandbox Code Playgroud)