如何从 rxjava 平面地图调用协程用例

BRD*_*oid 8 android use-case kotlin rx-java kotlin-coroutines

你好,我有一个 rxjava 平面地图,我想在其中调用一个协程用例 onStandUseCase,它是一个 api 调用

最初,用例也是基于 rxjava 的,它曾经返回Observable<GenericResponse>并且工作正常,现在我将使用更改为基于协程,它只返回GenericResponse

如何修改平面图以与协程用例一起正常工作

        subscriptions += view.startFuellingObservable
        .onBackpressureLatest()
        .doOnNext { view.showLoader(false) }
        .flatMap {
            if (!hasOpenInopIncidents()) {
                //THIS IS WHERE THE ERROR IS IT RETURNS GENERICRESPONSE
                onStandUseCase(OnStandUseCase.Params("1", "2", TimestampedAction("1", "2", DateTime.now()))) {
                    
                }
               
            } else {
                val incidentOpenResponse = GenericResponse(false)
                incidentOpenResponse.error = OPEN_INCIDENTS
                Observable.just(incidentOpenResponse)
            }
        }
        .subscribe(
            { handleStartFuellingClicked(view, it) },
            { onStartFuellingError(view) }
        )
Run Code Online (Sandbox Code Playgroud)

OnStandUseCase.kt

class OnStandUseCase @Inject constructor(
    private val orderRepository: OrderRepository,
    private val serviceOrderTypeProvider: ServiceOrderTypeProvider
) : UseCaseCoroutine<GenericResponse, OnStandUseCase.Params>() {

    override suspend fun run(params: Params) = orderRepository.notifyOnStand(
        serviceOrderTypeProvider.apiPathFor(params.serviceType),
        params.id,
        params.action
    )

    data class Params(val serviceType: String, val id: String, val action: TimestampedAction)
}
Run Code Online (Sandbox Code Playgroud)

用例协程

abstract class UseCaseCoroutine<out Type, in Params> where Type : Any {

    abstract suspend fun run(params: Params): Type

    operator fun invoke(params: Params, onResult: (type: Type) -> Unit = {}) {
        val job = GlobalScope.async(Dispatchers.IO) { run(params) }
        GlobalScope.launch(Dispatchers.Main) { onResult(job.await()) }
    }
}
Run Code Online (Sandbox Code Playgroud)

startFuellingObservable 是

val startFuellingObservable: Observable<Void>
Run Code Online (Sandbox Code Playgroud)

这是错误的图像

在此输入图像描述

有关如何解决此问题的任何建议,请

提前致谢

Geo*_*ung 7

有链接 RxJava 和 Kotlin 协程的集成库

rxSingle可用于将挂起函数转换为Single. OP 想要一个Observable,所以我们可以要求toObservable()进行转换。

.flatMap {
    if (!hasOpenInopIncidents()) {
        rxSingle {
            callYourSuspendFunction()
        }.toObservable()
    } else {
        val incidentOpenResponse = GenericResponse(false)
        incidentOpenResponse.error = OPEN_INCIDENTS
        Observable.just(incidentOpenResponse)
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,Observable两个分支中的 s 仅包含一个元素。我们可以通过使用Observable#concatMapSingle使这一事实更加明显。

.concatMapSingle {
    if (!hasOpenInopIncidents()) {
        rxSingle { callYourSuspendFunction() }
    } else {
        val incidentOpenResponse = GenericResponse(false)
        incidentOpenResponse.error = OPEN_INCIDENTS
        Single.just(incidentOpenResponse)
    }
}
Run Code Online (Sandbox Code Playgroud)