有条件链单和可完成

Dak*_*ksh 3 parallel-processing multithreading android kotlin rx-java

我的 Rx 调用的整体工作流程应如下工作(无论当前的 Rx 代码如何):

  • 从 a 获取运动传感器读数列表Room Dao(目的是将它们上传到 REST API)。我正在Single<List<Reading>>为此使用
  • 如果该readings列表为空,则执行jobFinished()回调并在此之后不执行任何操作
  • 如果readings不为空,则将网络调用链接到 this Single。网络调用返回一个Completable
  • Single从来没有逻辑上抛出一个错误,因为它要么取空或非空readings列表
  • 当整个 Rx 调用链终止时,执行jobFinished()回调
  • 在整个 Rx 调用链成功时,将那些readings 从Dao
  • 在成功Single,但错误的Completable,更新读数Dao

我目前的代码如下:

  Single.create<List<Reading>> {
        readings = readingDao.getNextUploadBatch()

        if (readings.isEmpty()) {
            jobFinished(job, false)
            return@create
        }

        it.onSuccess(readings)
    }
            .flatMapCompletable { api.uploadSensorReadings(it) }
            .doOnTerminate {
                jobFinished(job, !readingDao.isEmpty())
            }
            .subscribeOn(rxSchedulers.network)
            .observeOn(rxSchedulers.database)
            .subscribe(
                    {
                        readingDao.delete(*readings.toTypedArray())
                    },
                    {
                        markCurrentReadingsAsNotUploading()
                    }
            )
Run Code Online (Sandbox Code Playgroud)



上面代码的逻辑问题是(没有在运行时测试过,但可以编译):

  • 我想从flatMapCompletableifreadings列表为空开始切断代码
  • doOnTerminate如果readings为空,我不想执行
  • 我不希望执行的onComplete部分(第一个{}块)subscribe除非readings非空,并且Completable返回成功
  • 我不希望执行的onError部分(第二个{}块)subscribe除非readings非空,并且Completable失败

我不确定如何将我的工作流程实现为高效且整洁的 Rx 调用链。任何建议将非常受欢迎!

aka*_*okd 5

如果您想根据值执行不同的操作,请考虑flatMap

Single.fromCallable(() -> readingDao.getNextUploadBatch())
.subscribeOn(rxSchedulers.network)
.flatMapCompletable(readings -> {
    if (readings.isEmpty()) {
        jobFinished(job, false);
        return Completable.complete();
    }
    return api.uploadSensorReadings(readings)
           .doFinally(() -> jobFinished(job, !readingDao.isEmpty()))
           .observeOn(rxSchedulers.database)
           .doOnComplete(() -> readingDao.delete(readings.toTypedArray()))
})
.subscribe(() -> /* ignored */, error -> markCurrentReadingsAsNotUploading());
Run Code Online (Sandbox Code Playgroud)