小编Kev*_*sey的帖子

添加重试WebClient的所有请求

我们有一个服务器来检索OAUTH令牌,并且通过WebClient.filter方法将oauth令牌添加到每个请求中,例如

webClient
                .mutate()
                .filter((request, next) -> tokenProvider.getBearerToken()
                        .map(token -> ClientRequest.from(request)
                                .headers(httpHeaders -> httpHeaders.set("Bearer", token))
                                .build()).flatMap(next::exchange))
                .build();
TokenProvider.getBearerToken returns Mono<String> since it is a webclient request (this is cached)
Run Code Online (Sandbox Code Playgroud)

我想要重试功能,发生401错误,将使令牌无效并再次尝试请求,我这样工作

webClient.post()
            .uri(properties.getServiceRequestUrl())
            .contentType(MediaType.APPLICATION_JSON)
            .body(fromObject(createRequest))
            .retrieve()
            .bodyToMono(MyResponseObject.class)
            .retryWhen(retryOnceOn401(provider))

private Retry<Object> retryOnceOn401(TokenProvider tokenProvider) {
        return Retry.onlyIf(context -> context.exception() instanceof WebClientResponseException && ((WebClientResponseException) context.exception()).getStatusCode() == HttpStatus.UNAUTHORIZED)
                .doOnRetry(objectRetryContext -> tokenProvider.invalidate());
    }
Run Code Online (Sandbox Code Playgroud)

有没有办法将其移至webClient.mutate()..... build()函数?这样所有请求都将具有此重试功能?

我尝试添加作为过滤器,但似乎没有用,例如

.filter(((request, next) -> next.exchange(request).retryWhen(retryOnceOn401(tokenProvider))))
Run Code Online (Sandbox Code Playgroud)

对解决此问题的最佳方法有何建议?问候

project-reactor spring-webflux

8
推荐指数
2
解决办法
5622
查看次数

将回调转换为反应式发布者 (Flux)

我正在使用第三方库来注册 MessageListener,当发生某些事件时,它们会调用注册的侦听器 onMessage 方法

public interface MessageListener {  
   // third party code, it auto-scans for all MessageListeners and registers them
    void onMessage(Message message);
}


public class SimpleMessageListener implements MessageListener {
   public void onMessage(Message message) {
      //do something non blocking
      //is it possible to 'transmit' to messagePublisher
}
   public Flux<Message> messagePublisher() {
       // a method to which to subscribeOn    
   }
}
Run Code Online (Sandbox Code Playgroud)

所以我的问题是将其转变为 Flux 的最佳方法是什么

最后我希望能够做这样的事情

messagePublisher().subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

************** 编辑 我的第一次尝试是这样的

private List<FluxSink<Message>> handlers = new ArrayList<>();
public void onMessage(Message message) {
   handlers.forEach(han …
Run Code Online (Sandbox Code Playgroud)

java project-reactor spring-webflux

6
推荐指数
1
解决办法
2099
查看次数

标签 统计

project-reactor ×2

spring-webflux ×2

java ×1