onNext和onComplete的行为

Kun*_*uno 11 android reactive-programming rx-java

我有一个Observable,无需发出值即可完成某些操作.我还有一个我希望Observable可以使用的对象列表.因此对于此列表中的所有元素:doSomething()

Observable.from(uris)
        .flatMap(new Func1<Uri, Observable<Void>>() {
            @Override
            public Observable<Void> call(Uri uri) {
                return createDoSomethingObservable(uri);
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<Void>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "completed");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Void aVoid) {
                Log.d(TAG, "next");
            }
        });
Run Code Online (Sandbox Code Playgroud)

以及创建Observable的方法:

Observable<Void> createDoSomethingObservable(final Uri uri) {
    return Observable.create(new Observable.OnSubscribe<Void>() {
        @Override
        public void call(Subscriber<? super Void> subscriber) {
            //doSomething
            subscriber.onNext(null);
            subscriber.onCompleted();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

现在,当我使用带有3个元素的List运行时,我得到:

next
next
next
completed
Run Code Online (Sandbox Code Playgroud)

这很好,因为这就是我想要的,但我不知道它为什么会起作用.首先我开始调用onComplete,因为最终observable完成了它的工作并完成了.但当然onNext永远不会在订阅者上调用.反过来也是如此.

所以我的问题是:

  1. 为什么onComplete只调用最后一个列表元素?
  2. 有没有更好的方法来解决这个问题?

Ada*_*m S 7

onComplete被调用的是最后一个元素,因为这是在chain(from(uris))中最早的observable 完成的时候.

预计您发出的可观测量flatMap将会调用onComplete.一旦完成(并且call已经返回),则from可以继续进行下一次发射.一旦from完成发出可观察量,它就会调用onComplete并且有效地完成链.


Ale*_*der 5

我认为,小代码可以帮助您理解onNext( )和的 行为onComplete().
假设,你有一个List<Uri>.让我们Observable<Uri> 手动转换它.

public static Observable<Uri> getUries(List<Uri> uriList){
    return Observable.create(new Observable.OnSubscribe<Uri>() {
        @Override
        public void call(Subscriber<? super Uri> subscriber) {
            for(Uri uri : uriList){
                subscriber.onNext(uri);
            }

            subscriber.onCompleted();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

或者使用Lambda表达式:

public static Observable<Uri> getUries(List<Uri> uriList){
    return Observable.create(subscriber -> {
        for(Uri uri : uriList){
            subscriber.onNext(uri);
        }

        subscriber.onCompleted();
    });
}
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,我们正在迭代输入列表,并调用onNext( )每个元素,当我们完成转换ListObservable,我们调用了onComplete()

PS 这段代码只是一个演示,请,从来没有使用它来transfor ListObservable.使用运算符Observable.from().

更新:

运营商from( )实施:

...
while (true) {
    if (o.isUnsubscribed()) {
        return;
    } else if (it.hasNext()) {
        o.onNext(it.next());
    } else if (!o.isUnsubscribed()) {
        o.onCompleted();
        return;
    } else {
        // is unsubscribed
        return;
    }
}
...
Run Code Online (Sandbox Code Playgroud)

链接:https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java#L75-L87