用RxJava中的observable替换回调

Pab*_*rra 17 java rx-java

我使用侦听器作为回调来观察Android的异步操作,但我认为用RxJava替换这个侦听器可能很棒,我是新的使用这个库,但我真的很喜欢它,我总是在Android项目中使用它.

这是我重构的代码:

public void getData( final OnResponseListener listener ){
   if(data!=null && !data.isEmpty()){
       listener.onSuccess();
   }
   else{
       listener.onError();
   }
}
Run Code Online (Sandbox Code Playgroud)

一个简单的回调:

public interface OnResponseListener {
   public void onSuccess();
   public void onError(); 
}
Run Code Online (Sandbox Code Playgroud)

而"观察者":

object.getData( new OnResponseListener() {
    @Override
    public void onSuccess() {
       Log.w(TAG," on success");
    }

    @Override
    public void onError() {
       Log.e(TAG," on error");
    }
});
Run Code Online (Sandbox Code Playgroud)

谢谢!

YMY*_*YMY 18

例如,您可以使用Observable.fromCallable为您的数据创建observable.

public Observable<Data> getData(){
    return Observable.fromCallable(() -> {
        Data result = null;
        //do something, get your Data object
        return result;
    });
}
Run Code Online (Sandbox Code Playgroud)

然后使用你的数据

 getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });
Run Code Online (Sandbox Code Playgroud)

使用rxjava 1.x和lambda表达式.

编辑:

如果我理解你,你想要替换那个监听器,而不是把它包装成可观察的.我在参考你的评论时添加了其他例子.哦.. 如果你只期待一件商品,也应该使用Single.

public Single<Data> getData() {
        return Single.create(singleSubscriber -> {
            Data result = object.getData();
            if(result == null){
                singleSubscriber.onError(new Exception("no data"));
            } else {
                singleSubscriber.onSuccess(result);
            }
        });
    }

getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });
Run Code Online (Sandbox Code Playgroud)


Mak*_*dov 7

您正在寻找Completable.create:

Completable:表示没有任何值的延迟计算,但仅表示完成或异常.该类遵循与Reactive-Streams类似的事件模式:onSubscribe(onError | onComplete)?

Completable.create(subscriber -> {
    object.getData(new OnResponseListener() {
        @Override
        public void onSuccess() {
           subscriber.onCompleted();
        }

        @Override
        public void onError() {
           subscriber.onError(* put appropriate Throwable here *);
        }
    }
})
...//apply Schedulers
.subscribe((() -> *success*), (throwable -> *error*));
Run Code Online (Sandbox Code Playgroud)


Geo*_*izy 6

我将如何重构您的代码;与 getData 方法一起,我将添加包装为 Single 的 getData 方法:

public void getData( final OnResponseListener listener ){
    if(data!=null && !data.isEmpty()){
        listener.onSuccess();
    }
    else{
        listener.onError();
    }
}

public Single<Boolean> getDataSingle() {
    return Single.create(new SingleOnSubscribe<Boolean>() {
        @Override
        public void subscribe(SingleEmitter<Boolean> e) throws Exception {
            getData(new OnResponseListener() {
                @Override
                public void onSuccess() {
                    e.onSuccess(true);
                }

                @Override
                public void onError() {
                    e.onSuccess(false);
                }
            });
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

或者使用 Java 8:

public Single<Boolean> getDataSingle() {
    return Single.create(e -> getData(
            new OnResponseListener() {
                @Override
                public void onSuccess() {
                    e.onSuccess(true);
                }

                @Override
                public void onError() {
                    e.onSuccess(false);
                }
            })
    );
}
Run Code Online (Sandbox Code Playgroud)

现在,您已经在回调 API 旁边公开了一个 Rx API。假设它是您自己的某种 DataProvider,您现在可以在不处理回调的情况下使用它,如下所示:

dataProvider.getDataSingle()
        .map(result -> result ? "User exist" : "User doesn't exist")
        .subscribe(message -> display(message));
Run Code Online (Sandbox Code Playgroud)

我使用 Rx2 但使用 Rx1 逻辑是相同的。

我还使用了Single而不是 Observable,因为您只等待一个值。兴趣是您功能的更具表现力的契约。

您不能代表 Observable 发出值,即调用 myObservable.send(value) 之类的东西。第一个解决方案是使用Subject。另一种解决方案(上面的一个)是使用 Observable.create()(或 Single.create())创建 observable。您调用回调方法并在 Observable.create() 方法内创建侦听器,因为在 Observable.create() 内您可以调用 onSuccess() 方法,该方法告诉 Observable 传递值。

这是我用来将回调包装成可观察的。一开始有点复杂,但很容易适应。

我再给你举个例子,正如你所问的。假设您想将 EditText 的更改显示为 Snackbar:

View rootView;
EditText editTextView;

//Wrap Android addTextChangedListener into an Observable
Observable<String> textObservable = Observable.create(consumer ->
        editTextView.addTextChangedListener(new TextWatcher() {
            @Override
            public void beforeTextChanged(CharSequence s, int start, int count, int after) {

            }

            @Override
            public void onTextChanged(CharSequence s, int start, int before, int count) {

            }

            @Override
            public void afterTextChanged(Editable s) {
                consumer.onNext(s.toString());
            }
        })
);

//Use it
textObservable.subscribe(text -> Snackbar.make(rootView, text, Snackbar.LENGTH_SHORT).show());
Run Code Online (Sandbox Code Playgroud)