Roi*_*eck 4 java netty spring-boot project-reactor spring-webflux
我无法理解这个问题,我不确定我做错了什么。
我想等待 Flux 结束然后Mono
返回serverResponse
我已附上代码片段,它将doOnNext
填充categoryIdToPrintRepository
.
我已经查看了如何在通量结束后返回单声道,并发现了“then”,但在处理 onNextSite 之前仍然执行“then”方法,这会导致错误:
java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?
public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) {
return Mono.just("start").flatMap(id ->
Flux.fromIterable(appSettings.getSites())
.subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
.doOnNext(this::onNextSite)
.then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));
}
private void onNextSite(Integer siteId) {
IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> {
Optional<SiteCatalogCategoryDTO> cacheData =
siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> {/*do nothing already exist in cache*/},
() -> {
Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
.get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
});
});
}
private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
int siteId, int catalogId) {
if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
.subscribe(mapSCC -> {
categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
"Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
});
}
Run Code Online (Sandbox Code Playgroud)
您在应用程序中做了一些不应该做的事情subscribe
,并且您有 void 方法,除非在特定位置,否则不应在反应式编程中使用这些方法。
这是一些示例代码:
// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething() {
Mono.just("Foo");
}
// complier error
doSomething().subscribe( ... );
Run Code Online (Sandbox Code Playgroud)
您的应用程序是publisher
调用客户端,是订阅者,这就是为什么我们将 Mono 或 Flux 返回给调用客户端,它们subscribe
。
你已经这样解决了:
private void doSomething() {
Mono.just("Foo").subscribe( ... );
}
doSomething();
Run Code Online (Sandbox Code Playgroud)
现在你订阅自己来让事情运行,这不是正确的方法,正如前面提到的,调用客户端是订阅者,而不是你。
正确方法:
private Mono<String> doSomething() {
return Mono.just("Foo");
}
// This is returned out to the calling client, they subscribe
return doSomething();
Run Code Online (Sandbox Code Playgroud)
当 Mono/Flux 完成时,它将发出一个信号,该信号将触发链中的下一个、下一个和下一个。
所以我对你需要做的事情的看法如下:
subscribes
,如果您想做一些事情,可以使用诸如 、 、 等功能,flatmap
使map
链doOnSuccess
一直保持完整到客户端。Flux
或 a Mono
,如果您不想返回Mono<Void>
某些内容,请使用该函数返回 a Mono.empty()
,以便链完整。一旦你使用了 Mono/Flux,你就需要处理返回,以便其他人可以继续使用。
更新:
为了then
触发,你必须返回一些东西,它会在前面的 mono/flux 完成时返回。
例子:
private Flux<String> doSomething() {
return Flux.just("Foo", "Bar", "FooBar")
.doOnNext(string -> {
return // return something
});
}
// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
16776 次 |
最近记录: |