Sun*_*eUp 0 java reactive-programming rx-java rx-java2
我有多个热的 observables,它们可能会或可能不会发出项目。因此,我想组合 observables,然后在它们中的任何一个发出结果时处理结果,但如果其他 observables 在 item 处发出,则它们应该一起处理。
例如。
observable1 = PublishSubject<>()
observable2 = PublishSubject<>()
observable1.onNext(1)
observable1.onNext(2)
observable2.onNext("Test")
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)
应该发出:
(1, null)
(2, null)
(2, "Test")
(3, "Test")
Run Code Online (Sandbox Code Playgroud)
也有可能observable2
在之前被发射observable1
CombineLatest
是最接近我需要的,但只有在所有可观察对象至少发出一项时才会发出结果。是否有一个反应式运算符?
您可以startWith
与每个源一起使用以提供初始值,或BehaviorSubject
与初始值一起使用,然后应用于combineLatest
这些增强的 Observable。然而,null
在 RxJava 2 中是不允许的,所以你必须在你的 observable 元素类型中找到一个中性值。
PublishSubject<Integer> observable1 = PublishSubject.create()
PublishSubject<String> observable2 = PublishSubject.create()
Observable.combineLates(
observable1.startWith(-100000),
observable2.startWith(""),
(a, b) -> a + b
)
.subscribe(System.out::println)
;
observable1.onNext(1)
observable1.onNext(2)
observable2.onNext("Test")
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)
或者
BehaviorSubject<Integer> observable1 = BehaviorSubject.createDefault(-10000)
BehaviorSubject<String> observable2 = BehaviorSubject.createDefault("")
Observable.combineLates(
observable1,
observable2,
(a, b) -> a + b
)
.subscribe(System.out::println)
;
observable1.onNext(1)
observable1.onNext(2)
observable2.onNext("Test")
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1309 次 |
最近记录: |