RxJava:在onNext中调用取消订阅

Woj*_*ela 10 java rx-java

我想知道unsubscribe从这样的onNext处理程序中调用是否合法:

List<Integer> gatheredItems = new ArrayList<>();

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    public void onNext(Integer item) {
        gatheredItems.add(item);
        if (item == 3) {
            unsubscribe();
        }
    }
    public void onCompleted() {
        // noop
    }
    public void onError(Throwable sourceError) {
        // noop
    }
};

Observable<Integer> source = Observable.range(0,100);

source.subscribe(subscriber);
sleep(1000);
System.out.println(gatheredItems);
Run Code Online (Sandbox Code Playgroud)

上面的代码正确输出只收集了四个元素:[0, 1, 2, 3].但是如果有人将源observable更改为缓存:

Observable<Integer> source = Observable.range(0,100).cache();
Run Code Online (Sandbox Code Playgroud)

然后收集所有一百个元素.我没有对source observable的控制(无论是否缓存),那么如何从内部取消订阅onNext呢?

顺便说一句:那么在不onNext正确的事情中取消订阅是做什么的?

(我的实际用例是,onNext我实际上是在写输出流,当IOException发生时,没有什么可以写入输出,所以我需要以某种方式停止进一步处理.)

aka*_*okd 3

一旦第一个订阅者到达,cache() 就会缓存所有内容,并且不会提供任何阻止下游订阅者的选项。您需要在cache()之前停止流以避免过多的保留:

Observable<Integer> source = Observable.range(1, 100);

PublishSubject<Integer> stop = PublishSubject.create();

source
.doOnNext(v -> System.out.println("Generating " + v))
.takeUntil(stop)
.cache()
.doOnNext(new Action1<Integer>() {
    int calls;
    @Override
    public void call(Integer t) {
        System.out.println("Saving " + t);
        if (++calls == 3) {
            stop.onNext(1);
        }
    }
})
.subscribe();
Run Code Online (Sandbox Code Playgroud)

编辑:上面的示例在 1.0.13 以下不起作用,所以这里的版本应该:

SerialSubscription ssub = new SerialSubscription();

ConnectableObservable<Integer> co = source
        .doOnNext(v -> System.out.println("Generating " + v))
        .replay();

co.doOnNext(v -> {
    System.out.println("Saving " + v);
    if (v == 3) {
        ssub.unsubscribe();
    }
})
.subscribe();

co.connect(v -> ssub.set(v));
Run Code Online (Sandbox Code Playgroud)