exp*_*ert 3 kotlin project-reactor reactive-streams reactor-netty
你能解释一下返回的 Flux/Mono 到底发生了HttpClient.response() 什么吗?我认为在 Mono 完成之前,http 客户端生成的值不会传递到下游,但我看到生成了大量请求,但最终以reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8异常告终。它按预期工作(项目正在处理一个接一个),如果我更换呼叫testRequest()用Mono.fromCallable { }。
我错过了什么?
测试代码:
import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
class Test {
private val client = HttpClient.create(ConnectionProvider.create("meh", 4))
fun main() {
Flux.fromIterable(0..99)
.flatMap { obj ->
println("Creating request for: $obj")
testRequest()
.doOnError { ex ->
println("Failed request for: $obj")
ex.printStackTrace()
}
.map { res ->
obj to res
}
}
.doOnNext { (obj, res) ->
println("Created request for: $obj ${res.length} characters")
}
.collectList().block()!!
}
fun testRequest(): Mono<String> {
return client.get()
.uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
.responseContent()
.reduce(StringBuilder(), { sb, buf ->
val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
sb.append(str)
})
.map { it.toString() }
}
}
Run Code Online (Sandbox Code Playgroud)
当你ConnectionProvider像这样创建时ConnectionProvider.create("meh", 4),这意味着最大连接数为 4 和最大挂起请求数为 8 的连接池。请参阅此处了解更多信息。
当您使用flatMap此方式时,Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave请参阅此处了解有关此内容的更多信息。
所以发生的情况是您试图同时运行所有请求。
所以你有两个选择:
flatMap增加待处理请求的数量。concatMap代替flatMap,这意味着Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation。在此处查看更多信息。| 归档时间: |
|
| 查看次数: |
1380 次 |
| 最近记录: |