假设您要在Observable链中插入Completable,例如对于每个发出的元素,有一个可运行的块并在其完成之前阻塞,您会选择哪个选项?(这里Completable.complete()只是举个例子)
.flatMap { Completable.complete().andThen(Observable.just(it)) }
.doOnNext { Completable.complete().blockingAwait() }
别的什么?
我是使用 Kotlin 进行 Android 开发的新手,我正在努力寻找任何有用的文档,了解如何使用当前最佳实践创建简单的 GET 和 POST 请求。我来自 Angular 开发,我们使用 RxJS 进行响应式开发。
通常我会创建一个服务文件来保存我的所有请求函数,然后我会在任何组件中使用此服务并订阅可观察的。
在 Android 中你会如何做到这一点?有没有一个好的开始示例来说明必须创建的东西?从第一眼看上去,一切都显得如此复杂和过度设计
我有一个场景,我需要定期调用API来检查结果.我正在Flowable.interval创建一个调用API的区间函数.
但是,我在背压方面遇到了麻烦.在下面的示例中,在区间中的每个刻度上创建一个新单.如果呼叫尚未进行,则所需的效果是仅调用API
Flowable.interval(1, 1, TimeUnit.SECONDS).flatMap {
System.out.println("Delay $it")
//simulates API call
Single.just(1L).doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE SUCCESS!!!")
}.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()
Run Code Online (Sandbox Code Playgroud)
我可以使用像这样的过滤变量来解决这个问题:
var filter = true
Flowable.interval(1, 1, TimeUnit.SECONDS).filter {
filter
}.flatMap {
System.out.println("Delay $it")
Single.just(1L).doOnSubscribe {
filter = true
}.doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE!!!")
filter = true
}.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()
Run Code Online (Sandbox Code Playgroud)
但它似乎是一个hacky解决方案.我已经厌倦了onBackPressureDrop在interval功能之后应用,但它没有效果.
有什么建议?
我知道如何在RxJava 2 中做到这一点。
而且我知道RxKotlin如何帮助解决类似问题。
但似乎 RxKotlin.Observables 没有列表重载的这个辅助函数,我无法弄清楚。你会怎么做?
我有一个通过 Retrofit 调用 API 的 rx 链。我使用标准 rx 方法订阅我的 API 服务,subscribe({...})并向其传递 lambda。不幸的是,当我的调用最终完成时,我添加的要在 lambda 内执行的所有代码都被完全忽略。AndroidStudio 建议了一个修复程序,该修复程序基本上run为我的 lamda 添加了一个内联函数,并且......它神奇地工作了。我不知道发生了什么事。为什么没有它就不起作用run?有什么run作用?
代码如下:
valuesServiceApi.getValues()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ data ->
run { //<- What's this?
val cs = data.creditReportInfo.score
view.setCreditScore(cs)
Logger.getLogger("success:").info("credit score $cs")
}
})
Run Code Online (Sandbox Code Playgroud) 我有一些代码会阻止while循环中的操作(从服务器下载一些数据)。客户不知道每个步骤要退回多少物品。下载N个项目时循环中断。
val n = 10
val list = ArrayList<T>()
while (list.size < n) {
val lastItemId = list.last()?.id ?: 0
val items = downloadItems(lastItemId)
list.addAll(items)
}
Run Code Online (Sandbox Code Playgroud)
downloadItems执行阻止的HTTP调用并返回列表。现在,假设downloadItems更改,新的返回类型为Observable<Item>。我如何在不执行类似操作的情况下更改代码以使用RxJava blockingGet?
这个问题与Android和生命周期有关。以前,我会有一系列主题并在创作时订阅它们。
销毁后,我会将所有主题标记为完整,假设它处理了所有订阅者。
在 Android Studio 3.1 中,我会收到针对“未使用”的任何订阅者的警告。解决方案是将它们添加到“完全一次性”中,然后我在销毁时进行处理。
我需要“复合一次性”来在销毁时正确取消请求吗?我之前将主题标记为完整的方法有什么作用吗?在这种情况下有必要吗?
作为代码示例:
val observable: PublishSubject<Int> = PublishSubject.create()
val disposable = observable.subscribe { /* subscription */ }
fun onDestroy() {
observable.onComplete() // is this line necessary or helpful?
disposable.dispose()
}
Run Code Online (Sandbox Code Playgroud) 我需要在链中调用一个可完成的方法,完成后,它需要使用Map运算符
示例继续链
Single.just(someOperation())
.flatMapCompletable{
completableMethod()
}
.map{ // i need to continue here with map or some other operator
doSomeOperation()
}
Run Code Online (Sandbox Code Playgroud)
谁能帮我解决这个问题吗?