K.O*_*.Os 5 kotlin rx-java rx-java2
我正在尝试创建 Observable,它将在网络连接建立时重试。
我创建了主题:
private val retrySubject = PublishSubject.create<Unit>()()
Run Code Online (Sandbox Code Playgroud)
我这样使用它:
private fun publishNetworkReconnection() {
compositeDisposable?.add(
connectionHelper.observeConnection()
.subscribe {connected: Boolean
if(connected){
retrySubject.onNext(null)
}
}
)
}
Run Code Online (Sandbox Code Playgroud)
然后我尝试在 retryWhen 运算符中使用它:
val disposable =
Flowable.interval(0, UPDATE_INTERVAL, TimeUnit.SECONDS, Schedulers.io())
.onBackpressureDrop()
.flatMapCompletable {
revocationRepository.sync(event.id)
}
.retryWhen { retryHandler -> retryHandler.flatMap({ nothing -> retrySubject.asObservable() }) }
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ }, { Timber.e(it, "Unable to sync blacklist") })
compositeDisposable?.add(disposable)
}
Run Code Online (Sandbox Code Playgroud)
这种情况下该如何正确使用呢?
我在 Android Studio 中收到此错误:
类型不匹配。必填:发布者<< out (???..???) >>! 发现:可观察<无效!>!
问题是,您正在混合 RxJava 1 -Subject.asObservable
和 RxJava 2 - Flowable
。
retryWhen
RxJava 2的签名是:
Flowable<T> retryWhen(Function<? super Flowable<Throwable>,? extends Publisher<?>> handler)
Run Code Online (Sandbox Code Playgroud)
所以 lambda 里面retryWhen
应该返回一些扩展的东西Publisher
。相反,您返回Subject<Unit>
的是转换为Observable<Void>
,因为显然您Subject
来自 RxJava 1。并且 - 由于 RxJava 版本差异 - 显然它不会扩展Publisher
。
因此错误消息是正确的,retryWhen
预期Publisher
来自RxJava 2,但您Obervable
从RxJava 1给出。您没有注意到不同的软件包,因为它们未包含在消息中。
主要问题是混合 RxJava 1 和 RxJava 2 的代码,这从来都不是好事。
归档时间: |
|
查看次数: |
1893 次 |
最近记录: |