如何与项目reactor同步刷新访问令牌

Ros*_*sso 5 project-reactor spring-webflux

我正在访问使用 OpenID 连接保护的外部 REST API。我必须运行一个在完成之前多次调用 api 的作业。该作业可以使用多个并发线程并行执行。因此,我使用为每个作业实例化的 api 访问服务:

private AccessAndRefresTokens tokens; // I initially get that from somewhere else

public Mono<Result> callTheApi(){
  return createWebClientWith(tokens.accessToken).executeRequest();
}

public Mono<Result> callOtherApiFunction(){
  return createWebClientWith(tokens.accessToken).executeRequest();
}

public Mono<Result> callYetAnotherApiFunction(){
  return createWebClientWith(tokens.accessToken).executeRequest();
}

Run Code Online (Sandbox Code Playgroud)

当我的作业执行时,访问令牌可能会在两次 api 调用之间过期。为了防止这种情况发生,我喜欢在每次请求之前检查访问令牌的有效性,并在必要时刷新它。

我的第一个想法是在 flatMap 运算符内进行有效性检查:

Mono.just(tokens).flatMap(tokens -> {
  if(accessTokenExpired){
    return refreshAccessToken().doOnNext(refreshedTokens -> tokens = refreshedTokens);
  }else{
    return Mono.just(tokens.accessToken);
  }
}).flatMap(accessToken -> createWebClientWith(accessToken));
Run Code Online (Sandbox Code Playgroud)

然而,在我看来,如果多个线程访问服务并且刷新尚未完成,这将多次触发刷新。因此,我最终会在很短的时间内多次刷新访问令牌,这可能会由于速率限制而失败。

我对整个反应性事物很陌生,我认为使用synchronized块不是一个理想的选择。所以我试图用反应堆找出一些东西Processor,但我找不到令人满意的解决方案。

那么有没有办法确保访问令牌仅被引用一次?以及如何在不使用阻塞代码的情况下实现这一目标?