如何清理RxJava fire并忘记订阅?

But*_*ink 8 java android rx-java

我有火并且忘记了不需要很长时间的操作,但是我想要在后台反复运行它们.生命周期基本上是与服务器的连接存在或已经过了10秒.我不知道如何存储这些,以便我可以有效地清理它们,同时仍然保持正确的生命周期.

Subscription fireAndForget = service.doSomething()
                                    .timeout(10, TimeUnit.SECONDS)
                                    .subscribe( () -> {
                                        otherService.doAction()
                                    }, (e) -> { Log.e("error", e) });
// what do I do with fireAndForget???

// If I use a CompositeSubscription
subscriptions.add(fireAndForget) // how does it get removed????
Run Code Online (Sandbox Code Playgroud)

我可以在我的连接上有一个CompositeSubscription,它可以保留它们,但是当操作完成后如何清除它们呢?在订阅取消订阅时,我是否应该打扰清除CompositeSubscription?我对Rx比较陌生,所以我不确定我是不是只是想做一些Rx不打算做的事情.

cyr*_*xis 8

提示:火与

如果你保持订阅,你不会忘记.只需致电订阅并继续前进.

service.doSomething()
    .timeout(10, TimeUnit.SECONDS)
    .subscribe( () -> {
        otherService.doAction()
    }
Run Code Online (Sandbox Code Playgroud)

编辑:垃圾收集.

除非你做一些非常奇怪的事情(比如使用WeakReference),否则你执行的代码doSomething将阻止整个链被垃圾收集.

想象RxJava就像一个洋葱,每当你变换一个observable(map,doOnNext等)时,就会创建一个新的Observable,它像洋葱一样包裹旧的observable.

对于每个转换,Subscriber都会创建一个new ,它将回调(onNext,onCompleted,onError)传递给Subscriber链中的下一个.

a的主要原因Subscription是你可以打电话unsubscribe.您可能想要取消订阅有两个原因.

  1. 你有一个热的观察.基本上这个可观察者将永远发出价值.一个例子可以是一个可观察的,每30秒发出一次.unsubscribe当您不再对时间价值感兴趣时,您可以致电.

  2. 您想取消长时间的操作.假设您正在加载一个网页以显示给用户,如果用户按下,您想要取消加载(您不再关心结果).

地图操作的示例源代码

Map只lift使用a OperatorMap作为参数调用.Lift Observable基于a创建一个新的Operator,暂时可以忽略.

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}
Run Code Online (Sandbox Code Playgroud)

OperatorMap创建一个新的Subscriber,大多只是将调用传递给Subscriber它给出的任何东西.这可能是Subscriber你传入subscribeSubscriber由另一个map转换创建,它并不重要.

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

}
Run Code Online (Sandbox Code Playgroud)