Java中基于可变属性返回Mono的方法的线程安全

Mer*_*ous 2 java spring-boot project-reactor spring-webflux

在我的 Spring Boot 应用程序中,我有一个组件应该监视另一个外部系统的健康状态。该组件还提供了一个公共方法,反应链可以订阅该方法以等待外部系统启动。

@Component
public class ExternalHealthChecker {
  private static final Logger LOG = LoggerFactory.getLogger(ExternalHealthChecker.class);

  private final WebClient externalSystemWebClient = WebClient.builder().build(); // config omitted

  private volatile boolean isUp = true;
  private volatile CompletableFuture<String> completeWhenUp = new CompletableFuture<>();

  @Scheduled(cron = "0/10 * * ? * *")
  private void checkExternalSystemHealth() {
    webClient.get() //
        .uri("/health") //
        .retrieve() //
        .bodyToMono(Void.class) //
        .doOnError(this::handleHealthCheckError) //
        .doOnSuccess(nothing -> this.handleHealthCheckSuccess()) //
        .subscribe(); //
  }

  private void handleHealthCheckError(final Throwable error) {
    if (this.isUp) {
      LOG.error("External System is now DOWN. Health check failed: {}.", error.getMessage());
    }
    this.isUp = false;
  }

  private void handleHealthCheckSuccess() {
  // the status changed from down -> up, which has to complete the future that might be currently waited on  
  if (!this.isUp) {
      LOG.warn("External System is now UP again.");
      this.isUp = true;
      this.completeWhenUp.complete("UP");
      this.completeWhenUp = new CompletableFuture<>();
    }
  }


  public Mono<String> waitForExternalSystemUPStatus() {
    if (this.isUp) {
      LOG.info("External System is already UP!");
      return Mono.empty();
    } else {
      LOG.warn("External System is DOWN. Requesting process can now wait for UP status!");
      return Mono.fromFuture(completeWhenUp);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

该方法waitForExternalSystemUPStatus是公共的,可以从许多不同的线程调用。其背后的想法是为应用程序中的一些反应通量链提供一种暂停其处理直到外部系统启动的方法。当外部系统出现故障时,这些链无法处理其元素。

someFlux
  .doOnNext(record -> LOG.info("Next element")
  .delayUntil(record -> externalHealthChecker.waitForExternalSystemUPStatus())
  ... // starting processing
Run Code Online (Sandbox Code Playgroud)

这里的问题是我无法真正理解代码的哪一部分需要同步。waitForExternalSystemUPStatus我认为多个线程同时调用不应该有问题,因为这个方法没有写任何东西。所以我觉得这个方法不需要同步。然而,用 注释的方法@Scheduled也将在它自己的线程上运行,并且实际上会写入 的值,并且还可能更改对新的、未完成的未来实例的isUp引用。completeWhenUp我已将这两个可变属性标记为volatile,因为通过阅读 Java 中的此关键字,我觉得它将有助于保证读取这两个值的线程看到最新值。但是,我不确定是否还需要synchronized在部分代码中添加关键字。我也不确定同步关键字是否与反应器代码配合得很好,我很难找到这方面的信息。也许还有一种方法可以以更完整、反应性的方式提供功能ExternalHealthChecker,但我想不出任何方法。

Mic*_*rry 5

我强烈建议不要采用这种方法。像这样的线程代码的问题是它变得非常难以遵循和推理。我认为你至少handleHealthCheckSuccess()需要同步和waitForExternalSystemUPStatus()引用你的字段的部分completeWhenUp,否则你可能会遇到竞争危险(只有一个写入它,但写入后可能会乱序读取) -但很可能我还遗漏了其他东西,如果是这样,它可能会显示为这些令人讨厌的“百万分之一”类型的错误之一,几乎不可能确定。

不过,应该有一种更可靠、更简单的方法来实现这一目标。我不使用 Spring 调度程序,而是在创建组件时创建一个通量,ExternalHealthChecker如下所示:

healthCheckStream = Flux.interval(Duration.ofMinutes(10))
        .flatMap(i ->
                webClient.get().uri("/health")
                        .retrieve()
                        .bodyToMono(String.class)
                        .map(s -> true)
                        .onErrorResume(e -> Mono.just(false)))
        .cache(1);
Run Code Online (Sandbox Code Playgroud)

...其中healthCheckStream是类型字段Flux<Boolean>。(请注意,它不需要是易失性的,因为您永远不会替换它,因此不存在跨线程担忧 - 它是同一个流,将根据运行状况检查状态每 10 分钟更新一次不同的结果,无论线程如何您将从以下位置访问它。)

这实质上每 10 分钟创建一个运行状况检查响应值流,始终缓存最新的响应,并将其转变为热源。这意味着“订阅之前不会发生任何事情”在这种情况下不适用 - Flux 将立即开始执行,并且任何线程上的任何新订阅者将始终获得最新结果,无论是通过还是失败。handleHealthCheckSuccess()handleHealthCheckError()isUp、 、completeWhenUp然后都是多余的,它们可以消失 - 然后你waitForExternalSystemUPStatus()就可以变成一行:

return healthCheckStream.filter(x -> x).next();
Run Code Online (Sandbox Code Playgroud)

...然后工作就完成了,您可以从任何地方调用它,并且Mono只有在系统启动时才会完成。