订阅Vs订阅RxJava2(Android)?

bas*_*i82 43 android reactive-programming rx-java rx-java2

何时调用subscribeWith方法而不是普通订阅?什么是用例?

compositeDisposable.add(get()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(this::handleResponse, this::handleError));
Run Code Online (Sandbox Code Playgroud)

VS

   compositeDisposable.add(get()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
              //  .subscribe(this::handleResponse, this::handleError);
                .subscribeWith(new DisposableObserver<News>() {
                    @Override public void onNext(News value) {
                        handleResponse(value);
                    }

                    @Override public void onError(Throwable e) {
                        handleError(e);
                    }

                    @Override public void onComplete() {
                       // dispose here ? why? when the whole thing will get disposed later
                       //via  compositeDisposable.dispose();  in onDestroy();
                    }
                }));
Run Code Online (Sandbox Code Playgroud)

谢谢


稍后添加

根据文档,两者都返回一次性SingleObserver实例:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
    ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
    subscribe(s);
    return s;
}
Run Code Online (Sandbox Code Playgroud)

ConsumerSingleObserver类实现SingleObserver和Disposable的位置.

mma*_*ouf 33

可观察的#订阅说明:

在您的第一个代码段中:

.subscribe(this :: handleResponse,this :: handleError));

您实际上正在使用几种重载Observable#subscribe方法之一:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
Run Code Online (Sandbox Code Playgroud)

还有另一个也可以Action执行onComplete:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete) {
Run Code Online (Sandbox Code Playgroud)

另一个选项允许您简单地传入一个Observer (注意:void方法) (编辑2 - 定义此方法ObservableSource,这是Observable扩展的接口.)

public final void subscribe(Observer<? super T> observer)
Run Code Online (Sandbox Code Playgroud)

在你的问题的第二个代码片段中,你使用了subscribeWith简单地返回Observer你传入的方法(为了方便/缓存等):

public final <E extends Observer<? super T>> E subscribeWith(E observer)
Run Code Online (Sandbox Code Playgroud)

观察者#onComplete解释:

在Observable发出流中的所有项后,将调用Observer#onComplete.来自java doc:

/**
 * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
 * <p>
 * The {@link Observable} will not call this method if it calls {@link #onError}.
 */
void onComplete();
Run Code Online (Sandbox Code Playgroud)

因此,例如,如果get()您的代码片段中返回了一个Observable发出多个News对象的内容,那么每个对象都会被处理掉Observer#onNext.在这里,您可以处理每个项目.

在它们全部被处理之后(假设没有发生错误),那么onComplete将被调用.知道您已经处理了所有News对象,您可以执行任何需要执行的额外操作(例如,更新UI).

不要混淆Disposable#dispose当可观察流结束(完成/错误)时调用哪个,或者你手动终止观察(这是CompositeDisposable因为它可以帮助你Disposable一次性处理它包含的所有内容).

如果在您的场景get()中将返回Observable仅发出单个项目,而不是使用a Observable,请考虑使用io.reactivex.Single仅处理一个项目(in onSuccess)的位置,并且不需要Action为onComplete 指定:)

编辑:回复您的评论:

但是我仍然没有使用subscribeWith,你说它通过观察者进行缓存等,它传递到哪里?完成?从我所理解的订阅中,实际上并没有消费可观察(或单一)的权利?

为了进一步澄清subscribeWith解释,我的意思是它消耗Observer你传入的对象subscribeWith(与subscribe方法完全一样),但是它会另外将相同的Observer返回给你.在撰写本文时,subscribeWith的实现是:

public final <E extends Observer<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}
Run Code Online (Sandbox Code Playgroud)

因此,subscribeWith 可以互换使用subscribe.

你能举例说明subscribeWith的用例吗?我想这完全可以回答这个问题

subscribeWithjavadoc中给出了下面的使用示例:

Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();

ResourceObserver<Integer> rs = new ResourceObserver<>() {
     // ...
};

composite.add(source.subscribeWith(rs));
Run Code Online (Sandbox Code Playgroud)

在这里看到的用法subscribeWith将返回ResourceObserver实例化的同一个对象.这样可以方便地执行订阅并将其添加ResourceObserverCompositeDisposable一行中(注意ResourceObservable实现)Disposable.

编辑2回复第二条评论.

source.subscribeWith(RS); source.subscribe(RS); 两者都返回SingleObserver实例,

ObservableSource#subscribe(Observer <? super T> observer) 不会返回Observer.它是一个void方法(参见上面Observable#subscribe解释下的注释.)而Observable#subscribeWith DOES返回Observer.如果我们使用ObservableSource#subscribe相反的方法重写示例使用代码,我们必须在两行中这样做:

source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned
composite.add(rs);
Run Code Online (Sandbox Code Playgroud)

而这种Observable#subscribeWith方法使我们只需一行便可完成上述操作composite.add(source.subscribeWith(rs));

它可能会让所有重载的订阅方法看起来有点混乱,但是存在差异(其中一些是微妙的).查看代码和文档有助于区分它们.


编辑3 subscribeWith的另一个示例用例

subscribeWith当您具有Observer可能要重用的特定实现时,该方法很有用.例如,在上面的示例代码中,它提供了ResourceObserver订阅中的特定实现,从而继承了它的功能,同时仍允许您处理onNext onError和onComplete.

另一个示例用法:对于您问题中的示例代码,如果您想get()在多个位置对响应执行相同的订阅,该怎么办?

不是Consumer在不同的类中复制onNext和onError 的实现,而是可以为例如定义一个新类.

//sample code..
public class GetNewsObserver extends DisposableObserver<News> {
    //implement your onNext, onError, onComplete.
    ....
}
Run Code Online (Sandbox Code Playgroud)

现在,无论何时执行该get()请求,您都可以通过以下方式订阅:

compositeDisposable.add(get()
    ...
    .subscribeWith(new GetNewsObserver()));
Run Code Online (Sandbox Code Playgroud)

现在看代码很简单,你保持处理响应的责任分离,现在可以GetNewsObserver在任何你想要的地方重用它.

  • 在编辑3中添加了另一个示例用法. (2认同)
  • 这一切都取决于用例.如果您要在一个地方订阅并提供onNext,onError的唯一实现,那么使用带有lambdas的#subscribe是有意义的.否则,如果您需要在各种不同的类中重用常见的Observer代码,例如我在edit3中给出的示例,或者喜欢使用来自javadoc的ResourceObservable示例,那么您可以使用#subscribeWith.用最好的方法解决问题:) (2认同)