RxJava在多个订户之间共享Observable的排放

H3x*_*x0n 7 java android rx-java

我有以下问题:

我有一个可观察的东西正在做一些工作,但是其他的可观察者需要那个observable的输出才能工作.我试图在同一个observable上多次订阅,但在日志中我看到原始的observable被多次启动.

多数民众赞成我的observable创建对象:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
            if (mApi == null) {
                //do some work
            }
            subscriber.onNext(mApi);
            subscriber.unsubscribe();
        })
Run Code Online (Sandbox Code Playgroud)

这就是我需要对象的可观察性

loadApi().flatMap(api -> api....()));
Run Code Online (Sandbox Code Playgroud)

我正在使用

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
                .unsubscribeOn(Schedulers.io()
Run Code Online (Sandbox Code Playgroud)

在所有可观察的.

Mal*_*alt 13

我不确定我是否正确理解了你的问题,但我认为你正在寻找一种方法来分享几个订阅者之间可观察的排放量.有几种方法可以做到这一点.首先,您可以像这样使用Connectable Observable:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub A: " + item));
obs.subscribe(item -> System.out.println("Sub B: " + item));
obs.connect(); //Now the source observable starts emitting items
Run Code Online (Sandbox Code Playgroud)

输出:

Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub B: " + item));    
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable
Run Code Online (Sandbox Code Playgroud)

输出:

Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
Run Code Online (Sandbox Code Playgroud)

这两个示例都是单线程的,但您可以轻松添加observeOn或subscirbeOn调用以使它们异步.