Dak*_*ksh 3 parallel-processing multithreading android kotlin rx-java
我的 Rx 调用的整体工作流程应如下工作(无论当前的 Rx 代码如何):
Room Dao(目的是将它们上传到 REST API)。我正在Single<List<Reading>>为此使用readings列表为空,则执行jobFinished()回调并在此之后不执行任何操作readings不为空,则将网络调用链接到 this Single。网络调用返回一个CompletableSingle从来没有逻辑上抛出一个错误,因为它要么取空或非空readings列表jobFinished()回调readings 从DaoSingle,但错误的Completable,更新读数Dao我目前的代码如下:
Single.create<List<Reading>> {
readings = readingDao.getNextUploadBatch()
if (readings.isEmpty()) {
jobFinished(job, false)
return@create
}
it.onSuccess(readings)
}
.flatMapCompletable { api.uploadSensorReadings(it) }
.doOnTerminate {
jobFinished(job, !readingDao.isEmpty())
}
.subscribeOn(rxSchedulers.network)
.observeOn(rxSchedulers.database)
.subscribe(
{
readingDao.delete(*readings.toTypedArray())
},
{
markCurrentReadingsAsNotUploading()
}
)
Run Code Online (Sandbox Code Playgroud)
上面代码的逻辑问题是(没有在运行时测试过,但可以编译):
flatMapCompletableifreadings列表为空开始切断代码doOnTerminate如果readings为空,我不想执行onComplete部分(第一个{}块)subscribe除非readings非空,并且Completable返回成功onError部分(第二个{}块)subscribe除非readings非空,并且Completable失败我不确定如何将我的工作流程实现为高效且整洁的 Rx 调用链。任何建议将非常受欢迎!
如果您想根据值执行不同的操作,请考虑flatMap:
Single.fromCallable(() -> readingDao.getNextUploadBatch())
.subscribeOn(rxSchedulers.network)
.flatMapCompletable(readings -> {
if (readings.isEmpty()) {
jobFinished(job, false);
return Completable.complete();
}
return api.uploadSensorReadings(readings)
.doFinally(() -> jobFinished(job, !readingDao.isEmpty()))
.observeOn(rxSchedulers.database)
.doOnComplete(() -> readingDao.delete(readings.toTypedArray()))
})
.subscribe(() -> /* ignored */, error -> markCurrentReadingsAsNotUploading());
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3402 次 |
| 最近记录: |