Spring Reactive - 收集一系列分页结果作为所有结果的 Mono

Ada*_*lik 2 java spring project-reactor spring-webflux

在我的 REST 服务中,我必须多次调用另一个 REST 服务才能获取结果列表的所有页面。该请求包含一个from字段,我需要随每个请求增加该字段。响应包含一个totalResults字段 - 当我阅读了所有结果时,我需要停止调用其他服务,收集所有调用的所有结果并生成一个Mono<List<Result>>响应。

这是我到目前为止所拥有的:

@Getter
public class Request {
    private int from;
    private int size = 1000;
    private String type;

    public Request(String type, int from) {
        this.type = type;
        this.from = from;
    }
}

@Getter
@Setter
public class Response {
    private Integer totalResults;
    private Integer size;
    private Integer from;
    private List<Result> results;
}

public Mono<List<Result>> findByType(String type) {
    return Flux.generate(
            () -> new Request(type, 0),
            (Request request, SynchronousSink<List<Result>> sink) -> {
                Response response = find(request).block();
                sink.next(response.getResults());
                int nextFrom = response.getFrom() + response.getSize();
                if (nextFrom >= response.getTotalResults()) {
                    sink.complete();
                }
                return new Request(type, nextFrom);
            })
            .flatMap(Flux::fromIterable)
            .collectList();
}

private Mono<Response> find(Request request) {
    return webClient
            .post()
            .uri("/search")
            .syncBody(request)
            .retrieve()
            .bodyToMono(Response.class);
}
Run Code Online (Sandbox Code Playgroud)

它在使用的测试中工作MockWebServerStepVerifier但在生产中失败

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
Run Code Online (Sandbox Code Playgroud)

我怎样才能以正确的反应方式做到这一点?

Kev*_*sey 6

在 Adam 的帮助下进行编辑,该expand功能修复了这个问题

public Mono<List<Result>> findByType(Request request) {
        return find(request)
                .expand(response -> {
                    int nextFrom = response.getFrom() + response.getSize();
                    if (nextFrom >= response.getTotalResults()) {
                        return Mono.empty();
                    }
                    return find(new Request(request.getType(), response.getFrom() + response.getSize()));
                })
                 .flatMap(response -> Flux.fromIterable(response.getResults()))
                 .collectList();;
    }

    private Mono<Response> find(Request request) {
        return webClient
                .post()
                .uri("/search")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .syncBody(request)
                .retrieve()
                .bodyToMono(Response.class);
    }
Run Code Online (Sandbox Code Playgroud)