发布主题与Kotlin协程(流程)

may*_*yyo 7 android kotlin rx-java kotlin-coroutines

我正在尝试将一部分代码从RX重构为协程,但是尽管我付出了所有努力,但我仍然迷路了。

因此,我创建了PublishSubject,并且正在向其发送消息,而且还在监听结果。它可以完美地工作,但是现在我不确定如何对协程(流或通道)执行相同的操作。

private val subject = PublishProcessor.create<Boolean>>()

...

fun someMethod(b: Boolean) {
    subject.onNext(b)
}

fun observe() {
    subject.debounce(500, TimeUnit.MILLISECONDS)
           .subscribe {
         // value received
    }
}
Run Code Online (Sandbox Code Playgroud)

由于我需要使用反跳运算符,因此我真的想对流执行相同的操作,因此我创建了通道,然后尝试从该通道创建流并听取更改,但是没有得到任何结果。

private val channel = Channel<Boolean>()

...

fun someMethod(b: Boolean) {
    channel.send(b)
}

fun observe() {
    flow {
         channel.consumeEach { value ->
            emit(value)
         }
    }.debounce(500, TimeUnit.MILLISECONDS)
    .onEach {
        // value received
    }
}
Run Code Online (Sandbox Code Playgroud)

我有什么问题?

Jem*_*rov 34

应该是SharedFlow/MutableSharedFlow为了PublishProcessor/PublishRelay

\n
private val _myFlow = MutableSharedFlow<Boolean>(\n                          replay = 0,\n                          extraBufferCapacity = 1, // you can increase      \n                          BufferOverflow.DROP_OLDEST\n)\nval myFlow = _myFlow.asSharedFlow()\n\n\n// ...\nfun someMethod(b: Boolean) {\n    _myFlow.tryEmit(b)\n}\n\nfun observe() {\n    myFlow.debounce(500)\n          .onEach {  }\n          // flowOn(), catch{}\n          .launchIn(coroutineScope)\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n

StateFlow/MutableStateFlow对于BehaviorProcessor/BehaviorRelay.

\n
private val _myFlow = MutableStateFlow<Boolean>(false)\nval myFlow = _myFlow.asStateFlow()\n\n// ...\nfun someMethod(b: Boolean) {\n    _myFlow.value = b // same as _myFlow.emit(v), myFlow.tryEmit(b)\n}\n\nfun observe() {\n    myFlow.debounce(500)\n          .onEach {  }\n          // flowOn(), catch{}\n          .launchIn(coroutineScope)\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n

StateFlow必须有初始值,如果您不希望这样,这是解决方法:

\n
private val _myFlow = MutableStateFlow<Boolean?>(null)\nval myFlow = _myFlow.asStateFlow()\n                    .filterNotNull()\n
Run Code Online (Sandbox Code Playgroud)\n

MutableStateFlow设置新值时使用.equals比较,因此它不会一次又一次地发出相同的值(与distinctUntilChanged使用引用比较相比)。

\n

所以MutableStateFlow\xe2\x89\x88 BehaviorProcessor.distinctUntilChanged()。如果你想要精确的BehaviorProcessor行为,那么你可以使用这个:

\n
private val _myFlow = MutableSharedFlow<Boolean>(\n                              replay = 1, \n                              extraBufferCapacity = 0,\n                              BufferOverflow.DROP_OLDEST\n)\n\n
Run Code Online (Sandbox Code Playgroud)\n


tyn*_*ynn 26

Flow是一个冷异步流,就像一个Observable.

流上的所有转换,例如mapfilter不触发流收集或执行,只有终端操作符(例如single)才会触发它。

onEach方法只是一种转换。因此,您应该将其替换为终端流运算符collect。您也可以使用 aBroadcastChannel来获得更清晰的代码:

private val channel = BroadcastChannel<Boolean>(1)

suspend fun someMethod(b: Boolean) {
    channel.send(b)
}

suspend fun observe() {
  channel
    .asFlow()
    .debounce(500)
    .collect {
        // value received
    }
}
Run Code Online (Sandbox Code Playgroud)

更新:在提出问题时,debounce有两个参数的重载(如问题中所示)。已经没有了。但是现在有一个以毫秒(Long)为单位接受一个参数。

  • “BroadcastChannel”在 1.5 中已被弃用,取而代之的是“SharedFlow”和“StateFlow”。 (4认同)
  • 值得一提的是,“BroadcastChannel”仍处于实验阶段。 (2认同)

Nis*_*nth 6

Kotlin 协程中的 ArrayBroadcastChannel 与 PublishSubject 最相似。

  1. 与 PublishSubject 一样,一个 ArrayBroadcastChannel 可以有多个订阅者,并且所有活动订阅者都会立即收到通知。
  2. 与 PublishSubject 一样,如果此时没有活动订阅者,推送到此频道的事件将丢失。

与 PublishSubject 不同,背压内置于协程通道中,这就是缓冲区容量的来源。这个数字实际上取决于通道用于哪个用例。对于大多数正常用例,我只使用 10 个,这应该绰绰有余。如果您将事件推送到此通道的速度比使用它的接收器更快,则可以提供更多容量。

  • 仅供所有发现此评论的人使用:ArrayBroadcastChannel 是一个内部类,但根据 javadoc:“此通道是由 `BroadcastChannel(capacity)` 工厂函数调用创建的。” - 所以您需要做的就是创建一个常用的 BroadcastChannel 并将容量传递给其构造函数,例如 - BroadcastChannel(1)。 (5认同)