如何在RxJava中正确转换多播可观察对象

Mat*_*nič 6 java android reactive-programming rx-java rx-java2

假设我有一个事件发射数据源,我想将其转换为反应流.数据源由资源绑定(例如,定期发送更新状态的套接字),因此我希望共享该资源的单个订阅.使用单个observable replay(对于新订户立即获得当前值),refCount运营商似乎非常适合这种情况.例如,这是他的MyDataProvider单身人士的样子:

private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
    // Open my resource here and emit data into observable
})
    .doOnDispose(() -> {
        // Close my resource here
    })
    .replay(1)
    .refCount();

public Observable<MyData> getMyDataObservable() {
    return myDataObservable;
}
Run Code Online (Sandbox Code Playgroud)

但是,现在假设我有另一个数据源需要第一个数据源的结果来计算自己的值:

private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
    .flatMap(myData -> {
        // Call another data source and return the result here
    })

public Observable<AnotherData> getAnotherDataObservable() {
    return anotherDataObservable;
}
Run Code Online (Sandbox Code Playgroud)

在这里,我的设置开始崩溃.第一个可观测量的多播仅适用于refCount操作员.在那之后,一切都再次单播.这意味着如果进行两次单独的订阅anotherDataProvider,flatMap则将调用operator两次.我看到了两个解决方法,但我不喜欢这两个:

1.在多播发生之前转换第一个observable

最简单的解决方法似乎是我在myDataObservable某个地方保存单播变体,然后再进行多播操作,然后执行该多播操作.anotherDataObservable但是,如果这两个可观察对象位于不同的模块中,这种解决方法会使代码非常不优雅,需要MyDataProvider暴露两个看似返回相同数据的不同可观察量.

2.只需使用重复的多播运营商

第二种解决方法似乎是再次应用这些replayrefCount运营商anotherDataObservable.但这会造成效率低下,因为myDataObservable已经应用了第一个多播运算符,但现在除了浪费内存和CPU周期之外什么都不做.

这两种解决方法还涉及AnotherDataProvider到的耦合MyDataProvider.如果将来MyDataProvider不再需要更改和多播,我还必须更新AnotherDataProvider以从那里删除多播运营商.

解决这个问题的更优雅的方法是什么?我可以更好地构建一个完全避免这个问题的架构吗?

Tpo*_*6oH 3

关于您的第一种方法,在当前的设置中,您的anotherDataObservable用途myDataObservable和据我所知它们在逻辑上是耦合的,因为它们使用相同的源。因此,您需要为它们提供一些基本的共享逻辑。我会将其提取到一个公共模块,该模块将公开可观察对象的单播版本,然后在不同的模块中制作myDataObservableanotherDataObservable使用它,每个模块都添加多播逻辑。

另一种选择是有一个类,通过像 in 一样订阅资源来监视您的资源myDataObservable,在中进行处理并使用主题onNext发布映射结果,即BehavioralSubject(如果您希望始终有权访问最后发布的值)和原始值结果与另一个主题。客户端将订阅该主题,并将获得仅在监控类中计算一次的映射值或原始值。

PS 请记住在订阅之前为您的主题添加反压策略。

如果这些选项不适合您,请考虑一下避免flatMap多次致电是否真的很重要?您的代码非常简单,它是一个重要的指标。如果 flatMap 不重,您可以让它运行多次。