根据结果​​链接可观察量

Ric*_*ard 6 android reactive-programming rx-java rx-android

我是rx-java和rx-android的完全初学者.我听说学习曲线一开始就很陡峭.

我试图通过使用rx-android将所有基于Eventbus的代码替换为更安全的替代方案.

我已经设置了这个片段来编辑文本文本更改事件中的observable:

主要活动

RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).subscribe(new Action1<EditText>() {
            @Override
            public void call(EditText editText) {
                searchStopResultFragment.query(editText.getText().toString());
            }
        });
Run Code Online (Sandbox Code Playgroud)

RxUtils:

public static Observable<EditText> createEditTextChangeObservable(final EditText editText){
        return Observable.create(new Observable.OnSubscribe<EditText>() {
            @Override
            public void call(final Subscriber<? super EditText> subscriber) {
                editText.addTextChangedListener(new TextWatcher() {
                    @Override
                    public void beforeTextChanged(CharSequence s, int start, int count, int after) {

                    }

                    @Override
                    public void onTextChanged(CharSequence s, int start, int before, int count) {

                    }

                    @Override
                    public void afterTextChanged(Editable s) {
                        if (subscriber.isUnsubscribed()) return;
                        subscriber.onNext(editText);
                    }
                });
            }
        });
    }
Run Code Online (Sandbox Code Playgroud)

SearchStopResultFragment:

public void query(String query){
        lastQuery = query;
        resultObservable = StopProvider.getStopResultObservable(getActivity().getContentResolver(),query);
        subscription = resultObservable.subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<List<Stop>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(List<Stop> stops) {
                if(!lastQuery.equals("")) {

                    if(stops.size()>0) {

                        ArrayList<AdapterItem> items = adapter.getItems();
                        items.clear();

                        for (Stop stop : stops) {
                            SearchResultStopItem item = new SearchResultStopItem(stop, SearchResultStopItem.STOP);
                            items.add(item);

                        }

                        adapter.setItems(items);
                        adapter.notifyDataSetChanged();
                    }else{
                      //DO A NOTHER ASYNC QUERY TO FETCH RESULTS
                    }
                }else{
                    showStartItems();
                }
            }
        });
    }
Run Code Online (Sandbox Code Playgroud)

感觉就像我做错了.我在每个文本更改事件的片段中从查询方法创建新的observable.我还想根据结果创建一个新的异步查找操作StopProvider.getStopResultObservable(参见注释)

有没有?

dav*_*ola 2

这是我想出的:

RxUtils.createEditTextChangeObservable(txtInput)
.throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.map(EXTRACT_STRING)
.filter(STRING_IS_NOT_EMPTY)
.concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() {

    @Override
    public Observable<Pair<String, List<Stop>>> call(final String query) {

        return StopProvider.getStopResultObservable(getContentResolver(), query)
        .map(new Func1<List<Stop>, Pair<String, List<Stop>>>() {
            // I think this map is a bit more readable than the 
            // combineLatest, and since "query" should not be changing 
            // anyway, the result should be the same (you have to 
            // declare it as final in the method signature, though
            @Override
            public Pair<String, List<Stop>> call(List<Stop> stops) {
                return new Pair(query, stops);
            }
        });
    }
)
.concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() {

    @Override
    public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) {
        if (queryAndStops.second.size() == 0) {
            return RestClient.service().locationName(queryAndStops.first)
                       .map(new Func1<LocationNameResponse, List<Stop>>() {

                            @Override
                            public List<Stop> call(LocationNameResponse locationNameResponse) {
                                // since there was no if-else in your original code (you were always
                                // just wrapping the List in an Observable) I removed that, too
                                return locationNameResponse.getAddresses();
                            }
            });
        } else {
            return Observable.just(queryAndStops.second);
        }
    }
)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(this.<List<Stop>>bindToLifecycle())
.subscribe(new Action1<List<Stop>>() {
    @Override
    public void call(List<Stop> stops) {
        // since I don't know what your API is returning I think
        // it's saver to keep this check in:
        if (stops != null) {
            searchStopResultFragment.showStops(stops);
        } else {
            searchStopResultFragment.showStartItems();
        }
    }
},
new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
        showError(throwable);
    }
});
Run Code Online (Sandbox Code Playgroud)

在哪里:

public static final Func1<EditText, String> EXTRACT_STRING = new Func1<EditText, String>() {

    @Override
    public void String call(EditText editText) {
        return editText.getText().toString();
    }
};

public static final Func1<String, Boolean> STRING_IS_NOT_EMPTY = new Func1<String, Boolean>() {

    @Override
    public void String call(String string) {
        return !string.isEmpty();
    }
};
Run Code Online (Sandbox Code Playgroud)

因此,这至少消除了返回Observable.just(null)然后检查链条下游的需要。