Rxjava:如何在没有完成所有 observable 的情况下组合多个 observable?

Sun*_*eUp 0 java reactive-programming rx-java rx-java2

我有多个热的 observables,它们可能会或可能不会发出项目。因此,我想组合 observables,然后在它们中的任何一个发出结果时处理结果,但如果其他 observables 在 item 处发出,则它们应该一起处理。

例如。

observable1 = PublishSubject<>()  
observable2 = PublishSubject<>()

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)

应该发出:

(1, null) 
(2, null)
(2, "Test")
(3, "Test")
Run Code Online (Sandbox Code Playgroud)

也有可能observable2在之前被发射observable1

CombineLatest是最接近我需要的,但只有在所有可观察对象至少发出一项时才会发出结果。是否有一个反应式运算符?

aka*_*okd 5

您可以startWith与每个源一起使用以提供初始值,或BehaviorSubject与初始值一起使用,然后应用于combineLatest这些增强的 Observable。然而,null在 RxJava 2 中是不允许的,所以你必须在你的 observable 元素类型中找到一个中性值。

PublishSubject<Integer> observable1 = PublishSubject.create()  
PublishSubject<String>  observable2 = PublishSubject.create()

Observable.combineLates(
    observable1.startWith(-100000),
    observable2.startWith(""),
    (a, b) -> a + b
)
.subscribe(System.out::println)
;

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)

或者

BehaviorSubject<Integer> observable1 = BehaviorSubject.createDefault(-10000)  
BehaviorSubject<String>  observable2 = BehaviorSubject.createDefault("")

Observable.combineLates(
    observable1,
    observable2,
    (a, b) -> a + b
)
.subscribe(System.out::println)
;

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)
Run Code Online (Sandbox Code Playgroud)