may*_*yyo 7 android kotlin rx-java kotlin-coroutines
我正在尝试将一部分代码从RX重构为协程,但是尽管我付出了所有努力,但我仍然迷路了。
因此,我创建了PublishSubject,并且正在向其发送消息,而且还在监听结果。它可以完美地工作,但是现在我不确定如何对协程(流或通道)执行相同的操作。
private val subject = PublishProcessor.create<Boolean>>()
...
fun someMethod(b: Boolean) {
subject.onNext(b)
}
fun observe() {
subject.debounce(500, TimeUnit.MILLISECONDS)
.subscribe {
// value received
}
}
Run Code Online (Sandbox Code Playgroud)
由于我需要使用反跳运算符,因此我真的想对流执行相同的操作,因此我创建了通道,然后尝试从该通道创建流并听取更改,但是没有得到任何结果。
private val channel = Channel<Boolean>()
...
fun someMethod(b: Boolean) {
channel.send(b)
}
fun observe() {
flow {
channel.consumeEach { value ->
emit(value)
}
}.debounce(500, TimeUnit.MILLISECONDS)
.onEach {
// value received
}
}
Run Code Online (Sandbox Code Playgroud)
我有什么问题?
Jem*_*rov 34
应该是SharedFlow/MutableSharedFlow为了PublishProcessor/PublishRelay
private val _myFlow = MutableSharedFlow<Boolean>(\n replay = 0,\n extraBufferCapacity = 1, // you can increase \n BufferOverflow.DROP_OLDEST\n)\nval myFlow = _myFlow.asSharedFlow()\n\n\n// ...\nfun someMethod(b: Boolean) {\n _myFlow.tryEmit(b)\n}\n\nfun observe() {\n myFlow.debounce(500)\n .onEach { }\n // flowOn(), catch{}\n .launchIn(coroutineScope)\n\n}\nRun Code Online (Sandbox Code Playgroud)\n而StateFlow/MutableStateFlow对于BehaviorProcessor/BehaviorRelay.
private val _myFlow = MutableStateFlow<Boolean>(false)\nval myFlow = _myFlow.asStateFlow()\n\n// ...\nfun someMethod(b: Boolean) {\n _myFlow.value = b // same as _myFlow.emit(v), myFlow.tryEmit(b)\n}\n\nfun observe() {\n myFlow.debounce(500)\n .onEach { }\n // flowOn(), catch{}\n .launchIn(coroutineScope)\n\n}\nRun Code Online (Sandbox Code Playgroud)\nStateFlow必须有初始值,如果您不希望这样,这是解决方法:
private val _myFlow = MutableStateFlow<Boolean?>(null)\nval myFlow = _myFlow.asStateFlow()\n .filterNotNull()\nRun Code Online (Sandbox Code Playgroud)\nMutableStateFlow设置新值时使用.equals比较,因此它不会一次又一次地发出相同的值(与distinctUntilChanged使用引用比较相比)。
所以MutableStateFlow\xe2\x89\x88 BehaviorProcessor.distinctUntilChanged()。如果你想要精确的BehaviorProcessor行为,那么你可以使用这个:
private val _myFlow = MutableSharedFlow<Boolean>(\n replay = 1, \n extraBufferCapacity = 0,\n BufferOverflow.DROP_OLDEST\n)\n\nRun Code Online (Sandbox Code Playgroud)\n
tyn*_*ynn 26
Flow是一个冷异步流,就像一个Observable.
流上的所有转换,例如
map和filter不触发流收集或执行,只有终端操作符(例如single)才会触发它。
该onEach方法只是一种转换。因此,您应该将其替换为终端流运算符collect。您也可以使用 aBroadcastChannel来获得更清晰的代码:
private val channel = BroadcastChannel<Boolean>(1)
suspend fun someMethod(b: Boolean) {
channel.send(b)
}
suspend fun observe() {
channel
.asFlow()
.debounce(500)
.collect {
// value received
}
}
Run Code Online (Sandbox Code Playgroud)
更新:在提出问题时,debounce有两个参数的重载(如问题中所示)。已经没有了。但是现在有一个以毫秒(Long)为单位接受一个参数。
Kotlin 协程中的 ArrayBroadcastChannel 与 PublishSubject 最相似。
与 PublishSubject 不同,背压内置于协程通道中,这就是缓冲区容量的来源。这个数字实际上取决于通道用于哪个用例。对于大多数正常用例,我只使用 10 个,这应该绰绰有余。如果您将事件推送到此通道的速度比使用它的接收器更快,则可以提供更多容量。
| 归档时间: |
|
| 查看次数: |
1295 次 |
| 最近记录: |