在RxJava中,我无法从flatMap发出onComplete

Fra*_*Lee 4 rx-java

我有一个问题,当我使用flatmap时,我的Observable链不会终止.

我把我的例子归结为:

int count = Observable.just(1,2,3)
        .flatMap(s -> Observable.<Integer>create(subscr-> {
            subscr.onNext(s);
            if(s>2) {
                subscr.onCompleted();
            }
        }))
        .doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
        .count()
        .toBlocking()
        .first();
    System.err.println("Count: "+count);
Run Code Online (Sandbox Code Playgroud)

我希望'doOnEach'报告三个onNext事件,然后是onCompleted,最后链应该终止.

但是,输出是这样的:

OnNext -> [rx.Notification@c9e6d24d OnNext 1]
OnNext -> [rx.Notification@c9e6d24e OnNext 2]
OnNext -> [rx.Notification@c9e6d24f OnNext 3]
Run Code Online (Sandbox Code Playgroud)

(然后继续挂)

如果我删除flatMap运算符:

int count = Observable.just(1,2,3)
        .doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
        .count()
        .toBlocking()
        .first();
System.err.println("Count: "+count);
Run Code Online (Sandbox Code Playgroud)

......它的工作方式完全符合预期:

CREATED!
OnNext -> [rx.Notification@e3598bd9 OnNext 1]
OnNext -> [rx.Notification@e3598bda OnNext 2]
OnNext -> [rx.Notification@e3598bdb OnNext 3]
OnCompleted -> [rx.Notification@3834d63f OnCompleted]
Count: 3
Run Code Online (Sandbox Code Playgroud)

我想我做错了什么(我无法想象这种基本场景中的错误),但我没有看到它.

任何帮助表示赞赏......(我正在使用RxJava 1.1.0)

zsx*_*ing 8

问题在于以下几行:

        if(s>2) {
            subscr.onCompleted();
        }
Run Code Online (Sandbox Code Playgroud)

Observables s == 1并且s == 2不会发出onCompleted.由于flatMaponCompletedObservable完成所有s 时才会发出,因此onCompleted在您的情况下不会发出.