为什么我的RxJava Observable不会发出或完成,除非它被阻止?

Row*_*wan 8 java rx-java reactivex

背景

我有许多RxJava Observables(从Jersey客户端生成,或使用存根Observable.just(someObject)).所有这些都应该只发出一个值.我有一个组件测试,模拟了所有Jersey客户端和使用Observable.just(someObject),我看到了与运行生产代码时相同的行为.

我有几个类作用于这些可观察对象,执行一些计算(和一些副作用 - 我可能会在以后使它们成为直接返回值)并返回空的void observables.

有一次,在一个这样的类中,我试图将我的几个源可观察量压缩起来然后映射它们 - 如下所示:

public Observable<Void> doCalculation() {
    return Observable.zip(
        getObservable1(),
        getObservable2(),
        getObservable3(),
        UnifyingObject::new
    ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}

// in Unifying Object
public Observable<Void> processToNewObservable() {
    // ... do some calculation ...
    return Observable.empty();
}
Run Code Online (Sandbox Code Playgroud)

然后将计算类组合在一起并等待:

// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
        .toBlocking().lastOrDefault(null);
Run Code Online (Sandbox Code Playgroud)

问题

问题是,processToNewObservable()永远不会被执行.通过消除的过程,我可以看到它就是getObservable1()麻烦 - 如果我用它替换它Observable.just(null),一切都按照我的想象执行(但是我想要一个真值的空值).

重申getObservable1()一下,在生产代码中从Jersey客户端返回一个Observable,但该客户端是Observable.just(someValue)在我的测试中返回的Mockito模拟.

调查

如果我转换getObservable1()到阻塞,然后再包的第一个值中just(),再次执行一切如我想像(但我不希望引入阻塞步骤):

Observable.zip(
    Observable.just(getObservable1().toBlocking().first()),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
Run Code Online (Sandbox Code Playgroud)

我的第一个想法是,也许其他东西消耗了我的observable发出的值,并且zip看到它已经完成,因此确定压缩它们的结果应该是一个空的可观察量.我已经尝试添加.cache()到我认为相关的每个可观察源,但是,这并未改变行为.

我还尝试在zip之前在getObservable1上添加next/error/complete/finally处理程序(不将其转换为阻塞),但它们都没有执行:

getObservable1()
    .doOnNext(...)
    .doOnCompleted(...)
    .doOnError(...)
    .finallyDo(...);

Observable.zip(
    getObservable1(),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
Run Code Online (Sandbox Code Playgroud)

这个问题

我对RxJava很新,所以我很确定我错过了一些基本的东西.问题是:我可以做些什么愚蠢的事情?如果从我到目前为止所说的不明显,我该怎么做才能帮助诊断问题?

Row*_*wan 0

注意:我在这里没有找到非常令人满意的答案,所以我进一步挖掘并发现了一个更小的复制案例,所以我在这里提出了一个新问题:为什么我的 RxJava Observable 仅向第一个消费者发出?


我已经解决了至少部分问题(并且,向所有试图回答的人致歉,鉴于我的解释,我认为您没有太多机会)。

执行这些计算的各种类都返回了Observable.empty()(按照processToNewObservable()我原来的示例)。据我所知,Observable.zip()不会订阅第 N 个可观察量,它会一直压缩,直到第 N-1 个可观察量发出值为止。

我最初的例子声称这是getObservable1()行为不当 - 实际上有点不准确,这是后来在参数列表中观察到的。据我了解,使其阻塞,然后再次将该值转换为 Observable 的原因是因为使其阻塞并首先调用强制其执行,并且我得到了我想要的副作用。

如果我将所有计算类更改为 return Observable.just(null),则一切都会正常:所有计算类的可观察值的最终值zip()都会通过它们全部起作用,因此所有预期的副作用都会发生。

从设计的角度来看,返回 null Void 似乎我肯定做错了什么,但至少这个特定问题得到了解答。