在 Spring WebFlux Web 应用程序中缓存来自 WebClient 调用的 Mono 的结果

mah*_*nhz 10 project-reactor spring-webflux

我正在寻找缓存Mono(仅当它成功时)这是 WebClient 调用的结果。

通过阅读项目反应器插件文档,我不觉得CacheMono很合适,因为它也缓存了我不想要的错误。

因此,CacheMono我没有使用,而是执行以下操作:

Cache<MyRequestObject, Mono<MyResponseObject>> myCaffeineCache = 
    Caffeine.newBuilder()
            .maximumSize(100)
            .expireAfterWrite(Duration.ofSeconds(60))
            .build();

MyRequestObject myRequestObject = ...;

Mono<MyResponseObject> myResponseObject = myCaffeineCache.get(myRequestObject,
    requestAsKey -> WebClient.create()
                             .post()
                             .uri("http://www.example.com")
                             .syncBody(requestAsKey)
                             .retrieve()
                             .bodyToMono(MyResponseObject.class)
                             .cache()
                             .doOnError(t -> myCaffeineCache.invalidate(requestAsKey)));
Run Code Online (Sandbox Code Playgroud)

在这里,我调用缓存Mono,然后将其添加到咖啡因缓存中。

任何错误都会输入doOnError以使缓存无效。

这是缓存MonoWebClient 响应的有效方法吗?

Bri*_*zel 10

这是实际上允许您调用非反应性库并使用反应性类型包装它们的极少数用例之一,并在诸如 的副作用运算符中完成处理doOnXYZ,因为:

  • 咖啡因是内存中的缓存,所以据我所知不涉及 I/O
  • 缓存通常不提供关于缓存值的强有力的保证(它非常“一劳永逸”)

在这种情况下,您可以查询缓存以查看是否存在缓存版本(包装它并立即返回),并在doOn运算符中缓存成功的真实响应,如下所示:

public class MyService {

    private WebClient client;

    private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

    public MyService() {
        this.client = WebClient.create();
        this.myCaffeineCache = Caffeine.newBuilder().maximumSize(100)
          .expireAfterWrite(Duration.ofSeconds(60)).build();
    }

    public Mono<MyResponseObject> fetchResponse(MyRequestObject request) {

        MyResponseObject cachedVersion = this.myCaffeineCache.get(myRequestObject);
        if (cachedVersion != null) {
           return Mono.just(cachedVersion);
        } else {
           return this.client.post()
                         .uri("http://www.example.com")
                         .syncBody(request.getKey())
                         .retrieve()
                         .bodyToMono(MyResponseObject.class)
                         .doOnNext(response -> this.myCaffeineCache.put(request.getKey(), response));
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,我不会在这里缓存反应类型,因为一旦缓存返回值,就不会涉及 I/O 或背压。相反,订阅和其他反应流约束使事情变得更加困难。

此外,您对cache运营商的看法是正确的,因为它不是关于缓存值本身,而是关于重播其他订阅者发生的事情。我相信cacheandreplay运算符实际上是Flux.


Ale*_*kin 6

实际上,您不必使用 CacheMono 保存错误。

private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

...

Mono<MyResponseObject> myResponseObject =
        CacheMono.lookup(key -> Mono.justOrEmpty(myCaffeineCache.getIfPresent(key))
                .map(Signal::next), myRequestObject)
                .onCacheMissResume(() -> /* Your web client or other Mono here */)
                .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                        Optional.ofNullable(signal.get())
                                .ifPresent(value -> myCaffeineCache.put(key, value))));
Run Code Online (Sandbox Code Playgroud)

当您切换到外部缓存时,这可能很有用。不要忘记为外部缓存使用反应式客户端。