使用 RxJava retryWhen 运算符时 Kotlin 类型不匹配

K.O*_*.Os 5 kotlin rx-java rx-java2

我正在尝试创建 Observable,它将在网络连接建立时重试。

我创建了主题:

 private val retrySubject = PublishSubject.create<Unit>()()
Run Code Online (Sandbox Code Playgroud)

我这样使用它:

  private fun publishNetworkReconnection() {
    compositeDisposable?.add(
       connectionHelper.observeConnection()
        .subscribe {connected: Boolean
          if(connected){
          retrySubject.onNext(null)
          }
        }
     )
  }
Run Code Online (Sandbox Code Playgroud)

然后我尝试在 retryWhen 运算符中使用它:

   val disposable =
    Flowable.interval(0, UPDATE_INTERVAL, TimeUnit.SECONDS, Schedulers.io())
        .onBackpressureDrop()
        .flatMapCompletable {
          revocationRepository.sync(event.id)
        }
        .retryWhen { retryHandler -> retryHandler.flatMap({ nothing -> retrySubject.asObservable() }) }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({ }, { Timber.e(it, "Unable to sync blacklist") })
    compositeDisposable?.add(disposable)
  }
Run Code Online (Sandbox Code Playgroud)

这种情况下该如何正确使用呢?

我在 Android Studio 中收到此错误:

类型不匹配。必填:发布者<< out (???..???) >>! 发现:可观察<无效!>!

mic*_*brz 5

问题是,您正在混合 RxJava 1 -Subject.asObservable和 RxJava 2 - Flowable

retryWhenRxJava 2的签名是:

Flowable<T> retryWhen(Function<? super Flowable<Throwable>,? extends Publisher<?>> handler)
Run Code Online (Sandbox Code Playgroud)

所以 lambda 里面retryWhen应该返回一些扩展的东西Publisher。相反,您返回Subject<Unit>的是转换为Observable<Void>,因为显然您Subject来自 RxJava 1。并且 - 由于 RxJava 版本差异 - 显然它不会扩展Publisher

因此错误消息是正确的,retryWhen预期Publisher来自RxJava 2,但您ObervableRxJava 1给出。您没有注意到不同的软件包,因为它们未包含在消息中。

主要问题是混合 RxJava 1 和 RxJava 2 的代码,这从来都不是好事。