kro*_*kin 5 kotlin backpressure rx-java2 kotlin-coroutines kotlin-flow
在 RxJava 2 Flowable 中有不同的背压策略,其中最有趣的是:
在整个 Rx 链中都受到尊重。
在 Kotlin 中有 Flow,它声明它具有开箱即用的背压支持。我能够通过使用以下内容使 Flow 具有 BUFFER 和 LATEST 策略:
对于缓冲器:
observeFlow()
.buffer(10)
.collect { ... }
Run Code Online (Sandbox Code Playgroud)
最新:
observeFlow()
.conflate()
.collect { ... }
Run Code Online (Sandbox Code Playgroud)
这只是同一个缓冲区运算符的快捷方式。
但是我找不到任何可以与 DROP 相同的东西。简而言之,当前一个值尚未处理时,DROP 将删除流中的任何值。对于 Flow,我什至不确定这是否可能。
考虑案例:
observeFlow()
.backpressureDrop() // non-existent operator, just for illustrative purposes
.map { ... }
.flatMapMerge { ... }
.collect { ... }
Run Code Online (Sandbox Code Playgroud)
所以 backpressureDrop 应该尊重在流下面完成的任何工作,而该操作员对下面发生的事情一无所知(没有来自底部的显式回调——就像 RxJava 订阅者中的“请求”方法)。因此似乎不太可能。在收集前一个项目之前,该操作员不应通过任何事件。
是否有任何现成的运算符,我想念它,或者是否有一种简单的方法可以使用现有的 API 实现这样的东西?
我们可以使用由Rendezvous Channel支持的 Flow 来构建它。
\n\n\n\n\n当容量为 0 \xe2\x80\x93 时,它会创建 RendezvousChannel。该通道根本没有任何缓冲区。仅当发送和接收调用及时相遇(会合)时,元素才会从发送方传输到接收方,因此发送会暂停,直到另一个协程调用接收,而接收会暂停,直到另一个协程调用发送。
\n
Rendezvous 通道没有缓冲区。因此,该通道的消费者需要暂停并等待下一个元素,以便将元素发送到该通道。我们可以利用这种质量来删除在通道暂停的情况下无法接受的值Channel.offer,这是一个正常的非暂停函数。
\n\n\n如果可以立即添加元素而不违反容量限制,则将元素添加到此队列中并返回 true。否则,如果通道 isClosedForSend(详细信息请参阅 close),它会立即返回 false 或抛出异常。
\n
因为channelFlow是缓冲的,所以我们需要申请Flow<T>.buffer下游应用到0。
/**\n * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow\n * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and \n * waiting for the next element, the element is dropped. \n * \n * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element\n */\nfun <T> Flow<T>.drop(): Flow<T> = channelFlow {\n collect { offer(it) }\n}.buffer(capacity = 0)\nRun Code Online (Sandbox Code Playgroud)\n\n这是一个缓慢的消费者如何使用它来删除元素的示例。
\n\nfun main() = runBlocking {\n flow {\n (0..100).forEach {\n emit(it)\n delay(100)\n }\n }.drop().collect {\n delay(1000)\n println(it)\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n与相应的输出:
\n\n0\n11\n21\n31\n41\n51\n61\n71\n81\n91\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
1037 次 |
| 最近记录: |