RxJava与新订阅者的最后一个值共享

ava*_*nev 6 rx-java

我已经创建了Observable一个由于BehaviourSubject具有很多功能的转换.现在我想分享它的值Observable,因此不会为每个新订户重新执行函数链.此外,我希望共享副本的行为与原始副本相同,即新到达的订阅者应该在订阅后获得最后一次发布的值.

0.20.x它可以使用multicast(subjectFactory).refCount()工厂BehaviourSubject的,或简单地使用share(initialValue),而反过来使用BehaviourSubject而不是PublishSubject.

我如何实现相同的行为1.0.x

Sam*_*ter 7

我想你可以替换multicast(behaviorSubjectFactory).refCount()replay(1).refCount().

为了使讨论更具体,这里有一个完整的例子(在Scala中):

@volatile var startTime: Long = 0
def printTimestamped(s: String) {
  println(s"[t=${System.currentTimeMillis-startTime}] $s")
}

// Suppose for simplicity that the UI Events are just ticks of a
// hot timer observable.
val uiEvents = Observable.timer(1000 millis, 1000 millis)
        .doOnEach(i => printTimestamped("producing " + i))
        .publish

// Now apply all the transformations
val transformed = uiEvents.map(i => i + 101)
        .doOnEach(i => printTimestamped("transformed to " + i))

// And set a default start value
val o1 = transformed.startWith(100)

// Share and make sure new subscribers get the last element replayed
// immediately after they subscribe:
val o2 = o1.replay(1).refCount

// startTime is just before we start the uiEvents observable
startTime = System.currentTimeMillis
val subscriptionUiEvents = uiEvents.connect

Thread.sleep(500)

printTimestamped("subscribing A")
val subscriptionA = o2.subscribe(i => printTimestamped("A got " + i))

Thread.sleep(2000)

printTimestamped("subscribing B")
val subscriptionB = o2.subscribe(i => printTimestamped("B got " + i))

Thread.sleep(2000)

printTimestamped("unsubscribing B")
subscriptionB.unsubscribe()

Thread.sleep(2000)

printTimestamped("unsubscribing A")
subscriptionA.unsubscribe()

// Now the transformations will stop being executed, but the UI
// events will still be produced

Thread.sleep(2000)

// Finally, also stop the UI events:
subscriptionUiEvents.unsubscribe()
Run Code Online (Sandbox Code Playgroud)

输出:

[t=505] subscribing A
[t=519] A got 100
[t=1002] producing 0
[t=1003] transformed to 101
[t=1003] A got 101
[t=2002] producing 1
[t=2002] transformed to 102
[t=2002] A got 102
[t=2520] subscribing B
[t=2521] B got 102
[t=3003] producing 2
[t=3003] transformed to 103
[t=3003] A got 103
[t=3003] B got 103
[t=4002] producing 3
[t=4002] transformed to 104
[t=4002] A got 104
[t=4002] B got 104
[t=4521] unsubscribing B
[t=5003] producing 4
[t=5003] transformed to 105
[t=5003] A got 105
[t=6002] producing 5
[t=6002] transformed to 106
[t=6002] A got 106
[t=6522] unsubscribing A
[t=7003] producing 6
[t=8002] producing 7
Run Code Online (Sandbox Code Playgroud)

原始答案:

引用1.0.0发行说明:

删除了采用初始值的任何方法重载,因为startWith运算符已经允许这样做.

所以不要share(initialValue)只使用share().startWith(initialValue).