Spring Webclient重试并在所有重试都用尽时执行代码

Sar*_*oev 6 spring-boot project-reactor spring-webflux

我有一个 webhook 服务,可以将事件发送到不同的源 (URL)。按照设计,请求超时时间为10s,如果失败,则重试发送3次。如果所有重试都失败,则必须执行代码以禁用数据库中的该 URL。

到目前为止,我已成功重试并延迟了 5 秒。但是,我不确定失败后如何执行代码。

    try{

          String body = objectMapper.writeValueAsString(webhookDTO);

                webClient.post()
                        .uri(webhook.getUrl())
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(body)
                        .exchange()
                        .timeout(Duration.ofSeconds(5))
                        .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
                                .jitter(0d)
                                .doAfterRetry(retrySignal -> {
                                    logger.info("Retried " + retrySignal.totalRetries());
                                })
                                .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) 
                                                    -> new WebhookTimeoutException()))
                        .doOnSuccess(clientResponse -> {
                            logger.info("Event is received by " + client);
                        })
                        .subscribe();
            } catch (Exception e) {
                logger.error("Error on webhook dispatcher: ", e);
            }
Run Code Online (Sandbox Code Playgroud)

谁能举一些例子来说明如何做到这一点?

vin*_*ins 7

你快到了!doOnError只需按照此处所示使用即可。这里的想法是,一旦所有尝试失败后,你就抛出WebhookTimeoutException. 仅当抛出错误并更新数据库时才调用 doOnError 。异常类是可选的。你可以忽略这一点。

webClient.post()
        .uri(webhook.getUrl())
        .contentType(MediaType.APPLICATION_JSON)
        .bodyValue(body)
        .exchange()
        .timeout(Duration.ofSeconds(5))
        .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
                .jitter(0d)
                .doAfterRetry(retrySignal -> {
                    logger.info("Retried " + retrySignal.totalRetries());
                })
                .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) 
                                    -> new WebhookTimeoutException()))
        .doOnSuccess(clientResponse -> {
            logger.info("Event is received by " + client);
        })
        .doOnError(WebhookTimeoutException.class, (msg) -> {
            System.out.println("Message :: " + msg);
            // here update the DB
            dbRepository.save(...);
        })        
        .subscribe();
Run Code Online (Sandbox Code Playgroud)