我已经创建了Observable一个由于BehaviourSubject具有很多功能的转换.现在我想分享它的值Observable,因此不会为每个新订户重新执行函数链.此外,我希望共享副本的行为与原始副本相同,即新到达的订阅者应该在订阅后获得最后一次发布的值.
在0.20.x它可以使用multicast(subjectFactory).refCount()工厂BehaviourSubject的,或简单地使用share(initialValue),而反过来使用BehaviourSubject而不是PublishSubject.
我如何实现相同的行为1.0.x?
我想你可以替换multicast(behaviorSubjectFactory).refCount()的replay(1).refCount().
为了使讨论更具体,这里有一个完整的例子(在Scala中):
@volatile var startTime: Long = 0
def printTimestamped(s: String) {
println(s"[t=${System.currentTimeMillis-startTime}] $s")
}
// Suppose for simplicity that the UI Events are just ticks of a
// hot timer observable.
val uiEvents = Observable.timer(1000 millis, 1000 millis)
.doOnEach(i => printTimestamped("producing " + i))
.publish
// Now apply all the transformations
val transformed = uiEvents.map(i => i + 101)
.doOnEach(i => printTimestamped("transformed to " + i))
// And set a default start value
val o1 = transformed.startWith(100)
// Share and make sure new subscribers get the last element replayed
// immediately after they subscribe:
val o2 = o1.replay(1).refCount
// startTime is just before we start the uiEvents observable
startTime = System.currentTimeMillis
val subscriptionUiEvents = uiEvents.connect
Thread.sleep(500)
printTimestamped("subscribing A")
val subscriptionA = o2.subscribe(i => printTimestamped("A got " + i))
Thread.sleep(2000)
printTimestamped("subscribing B")
val subscriptionB = o2.subscribe(i => printTimestamped("B got " + i))
Thread.sleep(2000)
printTimestamped("unsubscribing B")
subscriptionB.unsubscribe()
Thread.sleep(2000)
printTimestamped("unsubscribing A")
subscriptionA.unsubscribe()
// Now the transformations will stop being executed, but the UI
// events will still be produced
Thread.sleep(2000)
// Finally, also stop the UI events:
subscriptionUiEvents.unsubscribe()
Run Code Online (Sandbox Code Playgroud)
输出:
[t=505] subscribing A
[t=519] A got 100
[t=1002] producing 0
[t=1003] transformed to 101
[t=1003] A got 101
[t=2002] producing 1
[t=2002] transformed to 102
[t=2002] A got 102
[t=2520] subscribing B
[t=2521] B got 102
[t=3003] producing 2
[t=3003] transformed to 103
[t=3003] A got 103
[t=3003] B got 103
[t=4002] producing 3
[t=4002] transformed to 104
[t=4002] A got 104
[t=4002] B got 104
[t=4521] unsubscribing B
[t=5003] producing 4
[t=5003] transformed to 105
[t=5003] A got 105
[t=6002] producing 5
[t=6002] transformed to 106
[t=6002] A got 106
[t=6522] unsubscribing A
[t=7003] producing 6
[t=8002] producing 7
Run Code Online (Sandbox Code Playgroud)
原始答案:
删除了采用初始值的任何方法重载,因为startWith运算符已经允许这样做.
所以不要share(initialValue)只使用share().startWith(initialValue).
| 归档时间: |
|
| 查看次数: |
1528 次 |
| 最近记录: |