Kotlin Flow onBackpressureDrop RxJava2 模拟

kro*_*kin 5 kotlin backpressure rx-java2 kotlin-coroutines kotlin-flow

在 RxJava 2 Flowable 中有不同的背压策略,其中最有趣的是:

  • 最新的
  • 缓冲
  • 降低

在整个 Rx 链中都受到尊重。

在 Kotlin 中有 Flow,它声明它具有开箱即用的背压支持。我能够通过使用以下内容使 Flow 具有 BUFFER 和 LATEST 策略:

对于缓冲器:

observeFlow()
    .buffer(10)
    .collect { ... }
Run Code Online (Sandbox Code Playgroud)

最新:

observeFlow()
    .conflate()
    .collect { ... }
Run Code Online (Sandbox Code Playgroud)

这只是同一个缓冲区运算符的快捷方式。

但是我找不到任何可以与 DROP 相同的东西。简而言之,当前一个值尚未处理时,DROP 将删除流中的任何值。对于 Flow,我什至不确定这是否可能。

考虑案例:

observeFlow()
    .backpressureDrop() // non-existent operator, just for illustrative purposes
    .map { ... }
    .flatMapMerge { ... }
    .collect { ... }
Run Code Online (Sandbox Code Playgroud)

所以 backpressureDrop 应该尊重在流下面完成的任何工作,而该操作员对下面发生的事情一无所知(没有来自底部的显式回调——就像 RxJava 订阅者中的“请求”方法)。因此似乎不太可能。在收集前一个项目之前,该操作员不应通过任何事件。

是否有任何现成的运算符,我想念它,或者是否有一种简单的方法可以使用现有的 API 实现这样的东西?

Kev*_*ini 3

我们可以使用由Rendezvous Channel支持的 Flow 来构建它

\n\n
\n

当容量为 0 \xe2\x80\x93 时,它会创建 RendezvousChannel。该通道根本没有任何缓冲区。仅当发送和接收调用及时相遇(会合)时,元素才会从发送方传输到接收方,因此发送会暂停,直到另一个协程调用接收,而接收会暂停,直到另一个协程调用发送。

\n
\n\n

Rendezvous 通道没有缓冲区。因此,该通道的消费者需要暂停并等待下一个元素,以便将元素发送到该通道。我们可以利用这种质量来删除在通道暂停的情况下无法接受的值Channel.offer,这是一个正常的非暂停函数。

\n\n

Channel.offer

\n\n
\n

如果可以立即添加元素而不违反容量限制,则将元素添加到此队列中并返回 true。否则,如果通道 isClosedForSend(详细信息请参阅 close),它会立即返回 false 或抛出异常。

\n
\n\n

因为channelFlow是缓冲的,所以我们需要申请Flow<T>.buffer下游应用到0。

\n\n
/**\n * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow\n * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and \n * waiting for the next element, the element is dropped. \n * \n * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element\n */\nfun <T> Flow<T>.drop(): Flow<T> = channelFlow {\n    collect { offer(it) }\n}.buffer(capacity = 0)\n
Run Code Online (Sandbox Code Playgroud)\n\n

这是一个缓慢的消费者如何使用它来删除元素的示例。

\n\n
fun main() = runBlocking {\n    flow {\n        (0..100).forEach {\n            emit(it)\n            delay(100)\n        }\n    }.drop().collect {\n        delay(1000)\n        println(it)\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

与相应的输出:

\n\n
0\n11\n21\n31\n41\n51\n61\n71\n81\n91\n
Run Code Online (Sandbox Code Playgroud)\n