添加重试WebClient的所有请求

Kev*_*sey 8 project-reactor spring-webflux

我们有一个服务器来检索OAUTH令牌,并且通过WebClient.filter方法将oauth令牌添加到每个请求中,例如

webClient
                .mutate()
                .filter((request, next) -> tokenProvider.getBearerToken()
                        .map(token -> ClientRequest.from(request)
                                .headers(httpHeaders -> httpHeaders.set("Bearer", token))
                                .build()).flatMap(next::exchange))
                .build();
TokenProvider.getBearerToken returns Mono<String> since it is a webclient request (this is cached)
Run Code Online (Sandbox Code Playgroud)

我想要重试功能,发生401错误,将使令牌无效并再次尝试请求,我这样工作

webClient.post()
            .uri(properties.getServiceRequestUrl())
            .contentType(MediaType.APPLICATION_JSON)
            .body(fromObject(createRequest))
            .retrieve()
            .bodyToMono(MyResponseObject.class)
            .retryWhen(retryOnceOn401(provider))

private Retry<Object> retryOnceOn401(TokenProvider tokenProvider) {
        return Retry.onlyIf(context -> context.exception() instanceof WebClientResponseException && ((WebClientResponseException) context.exception()).getStatusCode() == HttpStatus.UNAUTHORIZED)
                .doOnRetry(objectRetryContext -> tokenProvider.invalidate());
    }
Run Code Online (Sandbox Code Playgroud)

有没有办法将其移至webClient.mutate()..... build()函数?这样所有请求都将具有此重试功能?

我尝试添加作为过滤器,但似乎没有用,例如

.filter(((request, next) -> next.exchange(request).retryWhen(retryOnceOn401(tokenProvider))))
Run Code Online (Sandbox Code Playgroud)

对解决此问题的最佳方法有何建议?问候

Kev*_*sey 11

我发现了这一点,在看到重试仅适用于异常后,webClient不会抛出异常,这很明显,因为clientResponse对象仅保存响应,只有在调用bodyTo时才会在http状态上抛出异常,因此可以解决此问题,可以模仿这种行为

@Bean(name = "retryWebClient")
    public WebClient retryWebClient(WebClient.Builder builder, TokenProvider tokenProvider) {
        return builder.baseUrl("http://localhost:8080")
                .filter((request, next) ->
                        next.exchange(request)
                            .doOnNext(clientResponse -> {
                                    if (clientResponse.statusCode() == HttpStatus.UNAUTHORIZED) {
                                        throw new RuntimeException();
                                    }
                            }).retryWhen(Retry.anyOf(RuntimeException.class)
                                .doOnRetry(objectRetryContext -> tokenProvider.expire())
                                .retryOnce())

                ).build();
    }
Run Code Online (Sandbox Code Playgroud)

编辑具有重复/重试的功能之一是,它不会更改原始请求,在我的情况下,我需要检索一个新的OAuth令牌,但是上面发送了相同的(过期的)令牌。我确实想出了一种使用交换过滤器执行此操作的方法,一旦OAuth密码流进入spring-security-2.0,我应该可以将其与AccessTokens等集成,但是与此同时

ExchangeFilterFunction retryOn401Function(TokenProvider tokenProvider) {
        return (request, next) -> next.exchange(request)
                .flatMap((Function<ClientResponse, Mono<ClientResponse>>) clientResponse -> {
                    if (clientResponse.statusCode().value() == 401) {
                        ClientRequest retryRequest = ClientRequest.from(request).header("Authorization", "Bearer " + tokenProvider.getNewToken().toString()).build();
                        return next.exchange(retryRequest);
                    } else {
                        return Mono.just(clientResponse);
                    }
                });
    }
Run Code Online (Sandbox Code Playgroud)


rom*_*ara 5

我能够通过 完全完成此任务ExchangeFilterFunction,而不必抛出异常或类似的操作。

最初让我困惑的是期望响应(Mono、Flux 等)的行为与从结果调用中获得的响应相同WebClient。当您使用 时WebClient,如果收到未经授权的消息,Mono 会出现“错误”,您可以通过类似 的方式处理它onErrorResume。然而,在 中ExchangeFilterFunction,如果调用next.exchange(ClientRequest),返回的 Mono 只是类型的常规成功值ClientResponse,即使返回的是未经授权的值。

因此,要处理它,您可以使用如下代码(其中令牌服务替换您的特定令牌处理代码):

public class OneRetryAuthExchangeFilterFunction implements ExchangeFilterFunction {

    private final ITokenService tokenService;

    @Override
    public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
        ClientRequest authenticatedRequest = applyAuthentication(request);

        return next.exchange(authenticatedRequest)
                .flatMap(response -> {
                    if (HttpStatus.UNAUTHORIZED.equals(response.statusCode())) {
                        tokenService.forceRefreshToken();

                        ClientRequest refreshedAuthenticatedTokenRequest = applyAuthentication(request);

                        return next.exchange(refreshedAuthenticatedTokenRequest);
                    }

                    return Mono.just(response);
                });
    }

    private ClientRequest applyAuthentication(ClientRequest request) {
        String authenticationToken = tokenService.getToken();

        return ClientRequest.from(request)
                .headers(headers -> headers.setBearerAuth(authenticationToken))
                .build();
    }
}
Run Code Online (Sandbox Code Playgroud)

WebClient然后,您可以通过以下方式配置您的:

WebClient.builder()
        .filter(new OneRetryAuthExchangeFilterFunction(tokenService))
        .build();
Run Code Online (Sandbox Code Playgroud)

并且该 WebClient 的所有用户都将通过对未经授权的响应进行一次重试来进行身份验证