wuj*_*jek 1 project-reactor spring-webclient
编辑:这里https://github.com/wujek-srujek/reactor-retry-test是包含所有代码的存储库。
我有以下 SpringWebClient代码来 POST 到远程服务器(为简洁起见,没有导入的 Kotlin 代码):
private val logger = KotlinLogging.logger {}
@Component
class Client(private val webClient: WebClient) {
companion object {
const val maxRetries = 2L
val firstBackOff = Duration.ofSeconds(5L)
val maxBackOff = Duration.ofSeconds(20L)
}
fun send(uri: URI, data: Data): Mono<Void> {
return webClient
.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(data)
.retrieve()
.toBodilessEntity()
.doOnSubscribe {
logger.info { "Calling backend, uri: $uri" }
}
.retryExponentialBackoff(maxRetries, firstBackOff, maxBackOff, jitter = false) {
logger.debug { "Call to $uri failed, will retry (#${it.iteration()} of max $maxRetries)" }
}
.doOnError {
logger.error { "Call to $uri with $maxRetries retries failed with $it" }
}
.doOnSuccess {
logger.info { "Call to $uri succeeded" }
}
.then()
}
}
Run Code Online (Sandbox Code Playgroud)
(它返回一个空值Mono,因为我们不期望得到答案,也不关心它。)
我想测试两种情况,其中一种让我头疼,即我想测试所有重试是否已被触发的情况。我们正在使用MockWebServer(https://github.com/square/okhttp/tree/master/mockwebserver)和StepVerifierreactor-test。(成功的测试很简单,不需要任何虚拟时间调度程序魔法,并且工作得很好。)以下是失败测试的代码:
@JsonTest
@ContextConfiguration(classes = [Client::class, ClientConfiguration::class])
class ClientITest @Autowired constructor(
private val client: Client
) {
lateinit var server: MockWebServer
@BeforeEach
fun `init mock server`() {
server = MockWebServer()
server.start()
}
@AfterEach
fun `shutdown server`() {
server.shutdown()
}
@Test
fun `server call is retried and eventually fails`() {
val data = Data()
val uri = server.url("/server").uri()
val responseStatus = HttpStatus.INTERNAL_SERVER_ERROR
repeat((0..Client.maxRetries).count()) {
server.enqueue(MockResponse().setResponseCode(responseStatus.value()))
}
StepVerifier.withVirtualTime { client.send(uri, data) }
.expectSubscription()
.thenAwait(Duration.ofSeconds(10)) // wait for the first retry
.expectNextCount(0)
.thenAwait(Duration.ofSeconds(20)) // wait for the second retry
.expectNextCount(0)
.expectErrorMatches {
val cause = it.cause
it is RetryExhaustedException &&
cause is WebClientResponseException &&
cause.statusCode == responseStatus
}
.verify()
// assertions
}
}
Run Code Online (Sandbox Code Playgroud)
我使用它是withVirtualTime因为我不希望测试花费近几秒钟的时间。问题是测试无限期地阻塞。这是(简化的)日志输出:
okhttp3.mockwebserver.MockWebServer : MockWebServer[51058] starting to accept connections
Calling backend, uri: http://localhost:51058/server
MockWebServer[51058] received request: POST /server HTTP/1.1 and responded: HTTP/1.1 500 Server Error
Call to http://localhost:51058/server failed, will retry (#1 of max 2)
Calling backend, uri: http://localhost:51058/server
MockWebServer[51058] received request: POST /server HTTP/1.1 and responded: HTTP/1.1 500 Server Error
Call to http://localhost:51058/server failed, will retry (#2 of max 2)
Run Code Online (Sandbox Code Playgroud)
如您所见,第一次重试有效,但第二次重试会阻塞。我不知道如何编写测试以免发生这种情况。更糟糕的是,客户端实际上会使用抖动,这将使时序难以预测。
即使重试次数较多,以下使用StepVerifier但不使用的测试也可以正常工作:WebClient
@Test
fun test() {
StepVerifier.withVirtualTime {
Mono
.error<RuntimeException>(RuntimeException())
.retryExponentialBackoff(5,
Duration.ofSeconds(5),
Duration.ofMinutes(2),
jitter = true) {
println("Retrying")
}
.then()
}
.expectSubscription()
.thenAwait(Duration.ofDays(1)) // doesn't matter
.expectNextCount(0)
.expectError()
.verify()
}
Run Code Online (Sandbox Code Playgroud)
有人可以帮助我修复测试,并且最好能解释一下出了什么问题吗?
这是虚拟时间和时钟操作方式的限制StepVerifier。这些thenAwait方法与底层调度不同步(例如作为操作的一部分发生的情况retryBackoff)。这意味着操作员在时钟已经提前一天的时间点提交重试任务。+ 1 day and 10 seconds因此,由于时钟为 ,因此安排了第二次重试+1 day。此后,时钟永远不会提前,因此永远不会向 发出附加请求MockWebServer。
您的情况变得更加复杂,因为涉及一个额外的组件,该组件MockWebServer仍然“实时”工作。尽管推进虚拟时钟是一个非常快的操作,但来自静态时钟的响应MockWebServer会通过套接字,因此对重试调度有一定的延迟,从测试编写的角度来看,这使得事情变得更加复杂。
要探索的一种可能的解决方案是在并行线程中外部化 的创建VirtualTimeScheduler和advanceTimeBy对, 的调用。mockServer.takeRequest()
| 归档时间: |
|
| 查看次数: |
3629 次 |
| 最近记录: |