RxAndroid:创建简单的 Hot Observable

Car*_*ley 7 android observable rx-java rx-android

我正在创建一个 Observable,它在订阅时发出整数。我现在的实现是这样设置的,所以订阅它的行为从一开始就触发了生成,如下所示:

private Observable createObservable() {
    return Observable.create (
        new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> sub) {

                for (int i = 1; i < MAX_PROGRESS + 1; i++) {
                    sub.onNext(i);
                    SystemClock.sleep(1000);
                }
                sub.onCompleted();
            }
        }
    );
}
Run Code Online (Sandbox Code Playgroud)

我的理解是这是一个冷 Observable。我希望生成序列而不管任何订阅者,并且当订阅者订阅时,希望他们接收在订阅时恰好是当前的值。IOW,把它变成一个热门的 Observable。我宁愿不将 Observable 子类化,因为这会将它绑定到一个具体的 Integer 中,而实际上实际类型会有所不同。

Geo*_*ell 4

查看rx.subjects.BehaviorSubject<T>。如果您不熟悉rx.subjects.Subjects,我认为描述它们的最通用方式是它们打破了 A 点和 B 点之间订阅的连续性Observer<T>。可以接受onNext()来自多个源的 s (警告:需要外线程安全)。另一方面,主题也是一个Observable<T>,因此多个Observer<T>s 可以订阅,并且onNext()进来的 s 将被多播到每个下游Observer<T>

如果你的代码看起来像

Observable<T> src = ...;
Subscriber<T> dst;
src.subscribe(dst);
Run Code Online (Sandbox Code Playgroud)

使用BehaviorSubject的方法是

Observable<T> src = ...;
BehaviorSubject<T> subject = BehaviorSubject.create(defaultValue);
src.subscribe(subject);
Run Code Online (Sandbox Code Playgroud)

立即订阅来源,主题的发布速度与发布的速度一样快。BehaviourSubject 只保留最近的值并删除 defaultValue 和所有以前的值。

// safe to do multiple times.
Subscriber<T> dst;
subject.subscribe(dst);
Run Code Online (Sandbox Code Playgroud)

订阅时,订阅后立即从(或)dst接收最新值,然后接收所有后续值,直到取消订阅。srcdefaultValuedst

警告:对象有过度使用的倾向,因此请确保您需要一个。