RxJava,一个可观察的多个订阅者:publish().autoConnect()

FRR*_*FRR 14 reactive-programming rx-java

我正在玩rxJava/rxAndroid,并且有一些非常基本的东西不像我期望的那样.我有一个可观察的和两个订阅者:

Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
Run Code Online (Sandbox Code Playgroud)

这是输出:

D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
Run Code Online (Sandbox Code Playgroud)

现在,我知道我可以通过使用避免重复计数,publish().autoConnect()但我试图首先理解这种默认行为.每当有人订阅observable时,它就会开始发出数字序列.我明白了.因此,当Subscriber 1连接时它开始发射物品.Subscriber 2马上连接,为什么不能获得价值呢?

这是我理解它的方式,从可观察的角度来看:

  1. 有人订阅了我,我应该开始发出物品
    [订阅者:1] [项目到EMIT:1,2,3]

  2. 向订户发出项目"1"
    [订阅者:1] [项目到EMIT:2,3]

  3. 有人订阅了我,当我完成后我将再次发出1,2,3
    [订阅者:1&2] [项目到EMIT:2,3,1,2,3]

  4. 向订户发出项目'2'
    [订阅者:1&2] [项目到EMIT:3,1,2,3]

  5. 向订户发出项目'3'
    [订阅者:1&2] [项目到EMIT:1,2,3]

  6. 向订户发出"1"项
    [订阅者:1&2] [项目到EMIT:2,3]

  7. ...

但这不是它的工作原理.就像它们是两个独立的可观察者一样.这让我很困惑,他们为什么不把这些物品交给所有订户呢?

奖金:

这是如何publish().autoConnect()解决问题的?让我们分解吧.publish()给了我一个可连接的观察.可连接的observable就像一个常规的observable,但你可以告诉它何时连接.然后我继续通过电话告诉它立即连接autoConnect()

通过这样做......难道我没有得到与我开始时相同的东西吗?一个普通的常规观察.操作员似乎互相取消.

我可以闭嘴使用publish().autoconnect().但我想更多地了解可观察的工作方式.

谢谢!

Mat*_*Bos 15

这是因为实际上这是两个独立的可观察者.当你调用时,它们会"生成" subscribe().因此,您提供的步骤不正确,因为步骤3和4只是1和2,但是在不同的可观察量上.

但是你会看到它们为1 1 1 2 2 2,因为日志记录发生了.如果您要移除observeOn()零件,那么您将以交织的方式看到排放物.要查看下面的运行代码:

@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    Observable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation());
                    //.observeOn(single);

    dataStream.subscribe(i -> System.out.println("1  " + Thread.currentThread().getName() + " " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + Thread.currentThread().getName() + " " + (i - l)));

    Thread.sleep(1000);
}
Run Code Online (Sandbox Code Playgroud)

输出,至少在我的运行中是(注意线程名称):

1  RxComputationThreadPool-1 135376988
2  RxComputationThreadPool-2 135376988
1  RxComputationThreadPool-1 135486815
2  RxComputationThreadPool-2 135537383
1  RxComputationThreadPool-1 135560691
2  RxComputationThreadPool-2 135617580
Run Code Online (Sandbox Code Playgroud)

如果你申请observeOn()它变成:

1  RxSingleScheduler-1 186656395
1  RxSingleScheduler-1 187919407
1  RxSingleScheduler-1 187923753
2  RxSingleScheduler-1 186656790
2  RxSingleScheduler-1 187860148
2  RxSingleScheduler-1 187864889
Run Code Online (Sandbox Code Playgroud)

正如您已正确指出的那样,为了得到您想要的东西,您需要publish().refcount()或者简单地share()(它是别名)运算符.

这是因为publish()创建一个ConnectableObservable不会开始发出项目,直到通过该connect()方法告知这样做.在这种情况下,如果你这样做:

@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    ConnectableObservable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation())
                    .observeOn(single)
                    .publish();

    dataStream.subscribe(i -> System.out.println("1  " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + (i - l)));

    Thread.sleep(1000);
    dataStream.connect();
    Thread.sleep(1000);

}
Run Code Online (Sandbox Code Playgroud)

你会注意到,在第一次Thread.sleep()调用(第一次调用)时没有任何反应,就在dataStream.connect()调用排放发生之后.

refCount()接收一个ConnectableObservable并connect()通过计算当前订阅的订户数来从订户隐藏调用的需要.它的作用是在它调用的第一个订阅connect()之后,在最后取消订阅之后取消订阅原始的observable.

至于相互取消publish().autoConnect(),之后你会得到一个可观察但是它有一个特殊的属性,比如说原始的observable通过互联网进行API调用(持续10秒),当你使用它时,share()你最终会得到as对服务器的许多并行查询,因为在这10秒内有订阅.另一方面,share()你将只有一个电话.

如果共享的可观察对象能够非常快速地完成其工作(例如just(1,2,3)),那么你就不会看到任何好处.

autoConnect()/ refCount()为您提供您订阅的中间可观察对象而不是原始可观察对象.

如果您有兴趣深入本书:使用RxJava进行反应式编程


Yar*_*hiy 10

常规(冷)可观察

在的心脏Observablesubscribe功能.每次新观察者订阅时,它都会作为参数传递给此函数.该功能的作用是将数据提供给单个观察者.它通过调用observer.onNext方法来完成.它可以立即(像是just),或通过一些调度程序(例如interval),或从后台线程或回调(例如,通过启动一些异步任务)来执行此操作.

我强调一句话上面,因为这是这个函数知道被调用时,他们唯一的观察者.如果您多次订阅此类可观察对象,subscribe则会为每个订阅者调用其函数.

像这样的数据源称为冷可观察.

调度程序

应用subscribeOn运算符会在您的subscribe调用和原始可观察subscribe函数之间添加中间步骤.您不再直接呼叫它,而是通过指定的调度程序安排呼叫.

observeOnonNext为观察者的所有调用添加类似的中间步骤.

在您的示例中,subscribe函数被调用两次,即数据系列生成两次.调用是通过多线程io调度程序调度的,因此这些调用不会发生在主线程上,而是发生在其他两个线程上,几乎同时发生.两个线程都开始调用onNext两个订阅者的方法.请记住,每个线程只知道自己的订户.onNext调度由mainThread调度程序调度,调度程序是单线程的,即它们不能同时发生但需要以某种方式排队.严格来说,无法保证这些呼叫的顺序.它取决于各种因素,具体是针对具体实施的.尝试更换just使用interval(此消息之间将引入的延迟),你会看到的消息将在不同的顺序到达.

热观察

publish运算符使您的可观察性变,即可连接.它为这两个subscribe函数添加了中间步骤- 这只被调用一次,而对于onNext方法 - 它们被传播到所有订阅的可观察对象.换句话说,它允许多个订阅者共享单个订阅.

确切地说,subscribe调用connect方法时会调用函数.有两个connect自动调用的运算符:

  • autoConnectconnect在第一个订阅者进入时调用方法.但它永远不会断开连接.
  • refCountconnect在第一个订户进入时调用,并在最后一个订阅者取消订阅时自动断开连接.subscribe当新订户进来时,它将重新连接(再次呼叫功能).

publish().refCount()是流行的组合,所以它有捷径:share().

对于您的教育,请尝试使用以下代码share:

Observable<Long> dataStream = Observable.interval(100, TimeUnit.MILLISECONDS)
        .take(3)
        .share();
System.out.println("subscribing A");
dataStream.subscribe(v -> System.out.println("A got " + v));
TimeUnit.MILLISECONDS.sleep(150);
System.out.println("subscribing B");
dataStream.subscribe(v -> System.out.println("B got " + v));
TimeUnit.SECONDS.sleep(1);
Run Code Online (Sandbox Code Playgroud)

原始问题的答案

1)冷可观察总是处理单个用户.所以你的时间图应如下所示:

subscribed first subscriber
[SUBSCRIBER: 1][ITEMS TO EMIT: 1,2,3]
subscribed second subscriber
[SUBSCRIBER: 1][ITEMS TO EMIT: 1,2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 1,2,3]
emit "1" to subscriber 1
[SUBSCRIBER: 1][ITEMS TO EMIT: 2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 1,2,3]
emit "1" to subscriber 2
[SUBSCRIBER: 1][ITEMS TO EMIT: 2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 2,3]
...
Run Code Online (Sandbox Code Playgroud)

虽然由于多线程比赛而无法保证订单.

2)publish并且autoConnect不要互相取消.他们只是添加.

dataSource = ...;
dataSourceShared = dataSource.publish().autoConnect();
Run Code Online (Sandbox Code Playgroud)

现在,当您订阅多个订阅者时dataSourceShared,只有一个订阅原始版本dataSource.即,您不必为每个新订户发出新的消息系列.