Spring boot 反应式缓存

stu*_*ler 9 spring caching spring-boot project-reactor spring-webflux

在我的应用程序中,我使用 spring webflux,并使用 webclient 从某些第 3 方 API 检索详细信息。现在,我想将第一次 webClient 响应存储在内存缓存中,以便第二次我可以直接从缓存中获取这些响应。我正在尝试在内存缓存机制和“咖啡”中使用Spring boot。但没有一个能按预期工作。 应用程序.yml:

spring:
 cache:
  cache-names: employee
 caffiene:
  spec: maximumSize=200, expireAfterAccess=5m
Run Code Online (Sandbox Code Playgroud)

EmployeeApplication.java:

@SpringBootApplication
@EnableCaching
public class EmployeeApplication{
   public static void main(String[] args){
    
}
}
Run Code Online (Sandbox Code Playgroud)

EmployeeController.java: 它有一个休息端点employee/all,可以从第 3 方 Api 获取所有员工。 EmployeeService.java:

@Service
@Slf4j
public class EmployeeService{
  @Autowired
  private WebClient webClient;
  @Autowired
  private CacheManager cacheManager;
  @Cacheable("employee")
  public Mono<List<Employee>> getAllEmployee(){
    log.info("inside employee service {}");
    return webClient.get()
        .uri("/employees/")
        .retrieve()
        .bodyToMono(Employee.class);
}
}
Run Code Online (Sandbox Code Playgroud)

虽然我已经配置了缓存名称,但当我第二次点击该网址时,它正在调用服务方法。需要使用什么缓存机制来缓存 Mono 响应?请建议。

Ale*_*lex 5

有多种选项可以缓存反应式发布者。

  1. 使用响应式API在定义的持续时间内cache进行缓存Mono
employeeService.getAllEmployee()
    .cache(Duration.ofMinutes(60))
    .flatMap(employees -> {
        // process data
    })
Run Code Online (Sandbox Code Playgroud)
  1. 将外部缓存与咖啡因一起使用。

Caffeine 支持基于 CompletableFuture 的异步缓存,可以轻松适应 Reactive API。

AsyncLoadingCache<String, List<Employee>> cache = Caffeine.newBuilder()
    .buildAsync((tenant, executor) ->
            employeeService.getAllEmployee(tenant).toFuture()
    );


Mono<List<Employee>> getEmployee(String tenant) {
    return Mono.fromCompletionStage(clientCache.get(tenant));
}

Run Code Online (Sandbox Code Playgroud)
  1. 将外部缓存与 Guava 和CacheMono一起使用reactor-extra。如果您需要根据不同的输入缓存结果(例如多租户环境),则此选项更合适

更新CacheMono自reactor-extra 3.4.7以来已被弃用。更好地使用#2 将外部缓存与 Caffeine 一起使用。

这是番石榴的示例,但您可以对其进行调整CacheManager

Cache<String, List<Employee>> cache = CacheBuilder.newBuilder()
        .expireAfterWrite(cacheTtl)
        .build();


Mono<List<Employee>> getEmployee(String tenant) {
    return CacheMono.lookup(key -> Mono.justOrEmpty(cache.getIfPresent(key)).map(Signal::next), tenant)
            .onCacheMissResume(() -> employeeService.getAllEmployee(tenant))
            .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                            Optional.ofNullable(signal.get())
                                    .ifPresent(value -> cache.put(key, value))
                    )
            );
}
Run Code Online (Sandbox Code Playgroud)

  • 自 `reactor-extra` 3.4.7(2022 年 3 月 15 日发布)以来,`CacheMono` 已被弃用,并计划在 3.6.0 中删除:https://github.com/reactor/reactor-addons/issues/237 (3认同)