Flux 不会在“then”之前等待元素完成

Roi*_*eck 4 java netty spring-boot project-reactor spring-webflux

我无法理解这个问题,我不确定我做错了什么。

我想等待 Flux 结束然后Mono返回serverResponse

我已附上代码片段,它将doOnNext填充categoryIdToPrintRepository.

我已经查看了如何在通量结束后返回单声道,并发现了“then”,但在处理 onNextSite 之前仍然执行“then”方法,这会导致错误:

java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?

 public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) {
        return Mono.just("start").flatMap(id ->
                Flux.fromIterable(appSettings.getSites())
                        .subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
                        .doOnNext(this::onNextSite)
                        .then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));

    }

    private void onNextSite(Integer siteId) {
        IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> {
            Optional<SiteCatalogCategoryDTO> cacheData =
                    siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
            cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> {/*do nothing already exist in cache*/},
                    () -> {
                    Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
                            .get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
                    catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
                            handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
            });
        });
    }


    private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
                                          int siteId, int catalogId) {
        if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
            Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
                    .subscribe(mapSCC -> {
                        categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
                                "Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
                                        mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
                        siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
                    });
    }
Run Code Online (Sandbox Code Playgroud)

Tho*_*olf 6

您在应用程序中做了一些不应该做的事情subscribe,并且您有 void 方法,除非在特定位置,否则不应在反应式编程中使用这些方法。

这是一些示例代码:


// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething() {
    Mono.just("Foo");
}

// complier error
doSomething().subscribe( ... );

Run Code Online (Sandbox Code Playgroud)

您的应用程序是publisher调用客户端,是订阅者,这就是为什么我们将 Mono 或 Flux 返回给调用客户端,它们subscribe

你已经这样解决了:

private void doSomething() {
    Mono.just("Foo").subscribe( ... );
}

doSomething();
Run Code Online (Sandbox Code Playgroud)

现在你订阅自己来让事情运行,这不是正确的方法,正如前面提到的,调用客户端是订阅者,而不是你。

正确方法:

private Mono<String> doSomething() {
    return Mono.just("Foo");
}

// This is returned out to the calling client, they subscribe
return doSomething();
Run Code Online (Sandbox Code Playgroud)

当 Mono/Flux 完成时,它将发出一个信号,该信号将触发链中的下一个、下一个和下一个。

所以我对你需要做的事情的看法如下:

  • 删除所有 subscribes,如果您想做一些事情,可以使用诸如 、 、 等功能,flatmap使mapdoOnSuccess一直保持完整到客户端。
  • 删除所有void函数,确保它们返回 aFlux或 a Mono,如果您不想返回Mono<Void>某些内容,请使用该函数返回 a Mono.empty(),以便链完整。

一旦你使用了 Mono/Flux,你就需要处理返回,以便其他人可以继续使用。

更新:

为了then触发,你必须返回一些东西,它会在前面的 mono/flux 完成时返回。

例子:

private Flux<String> doSomething() {
    return Flux.just("Foo", "Bar", "FooBar")
               .doOnNext(string -> {
                   return // return something
               });
}

// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );
Run Code Online (Sandbox Code Playgroud)

  • 阅读“then(Mono&lt;V&gt; other)”的文档,它指出“让这个 Flux 完成,然后播放来自提供的 Mono 的信号”,这意味着如果您在该 Flux 完成时返回“Flux”(无论您想要它做什么)然后它会返回你在“then”中放入的内容。 (3认同)