如何与 Project Reactor 并行调用两个或多个 Web 服务或 REST 并加入答案

Dan*_*tor 7 java spring spring-boot project-reactor

您好,我想知道如何在 pararelo 中调用两个或多个 Web 服务或 Rest 服务并编写对调用的响应。

我在网上找到了一些使用其他技术的例子,但我无法让它与反应器一起工作

// start task A asynchronously
CompletableFuture<ResponseA> futureA = asyncServiceA.someMethod(someParam);
// start task B asynchronously
CompletableFuture<ResponseB> futureB = asyncServiceB.someMethod(someParam);

CompletableFuture<String> combinedFuture = futureA
        .thenCombine(futureB, (a, b) -> a.toString() + b.toString());

// wait till both A and B complete
String finalValue = combinedFuture.join();
Run Code Online (Sandbox Code Playgroud)

///////////////////////////////////////////////// //////////////////////////////

static void Run()
{
    //Follow steps at this link for addding a reference to the necessary .NET library:
    //http://stackoverflow.com/questions/9611316/system-net-http-missing-from-    
    //namespace-using-net-4-5

    //Create an HTTP Client
    var client = new HttpClient();

    //Call first service
    var task1 = client.GetAsync("http://www.cnn.com");

    //Call second service
    var task2 = client.GetAsync("http://www.google.com");

    //Create list of all returned async tasks
    var allTasks = new List<Task<HttpResponseMessage>> { task1, task2 };

    //Wait for all calls to return before proceeding
    Task.WaitAll(allTasks.ToArray());

}
Run Code Online (Sandbox Code Playgroud)

Sim*_*slé 6

假设您需要访问 2 个服务,因此您需要 2 个基础WebClient(每个都配置了正确的基础 URL 和例如身份验证方案):

@Bean
public WebClient serviceAClient(String authToken) {
    return WebClient.builder()
            .baseUrl("http://serviceA.com/api/v2/")
            .defaultHeader(HttpHeaders.AUTHORIZATION, "Basic " + authToken)
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
}

@Bean
public WebClient serviceBClient(String authToken): WebClient {
    return WebClient.builder()
            .baseUrl("https://api.serviceB.com/")
            .defaultHeader(HttpHeaders.AUTHORIZATION, "token " + authToken)
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
}
Run Code Online (Sandbox Code Playgroud)

从那里,让我们假设您将这 2 个 webclients 注入您的控制器(作为合格的bean)。这是使用 Reactor 对两者进行联合调用的代码:

Mono<ResponseA> respA = webclientA.get()
                                 .uri("/sub/path/" + foo)
                                 .retrieve()
                                 .bodyToMono(ResponseA.class);
Mono<ResponseB> respB = webclientB.get()
                                 .uri("/path/for/b")
                                 .retrieve()
                                 .bodyToMono(ResponseB.class);
Mono<String> join = respA.zipWith(respB, (a, b) -> a.toString + b.toString);
return join;
Run Code Online (Sandbox Code Playgroud)

请注意 zip 函数可以从 2 个响应中产生更有意义的东西,例如业务对象。结果Mono<String>仅在某些内容订阅它时触发 2 个请求(在 Spring WebFlux 的情况下,如果您从控制器方法返回它,框架将执行此操作)。


pau*_*aul 4

如果你\xc2\xb4正在使用Spring Reactor,你需要的是操作符Zip,来运行你的进程并在完成后将它们压缩。

\n\n
/**\n * Zip operator execute the N number of Flux independently, and once all them are finished, results\n * are combined in TupleN object.\n */\n@Test\npublic void zip() {\n    Flux<String> flux1 = Flux.just("hello ");\n    Flux<String> flux2 = Flux.just("reactive");\n    Flux<String> flux3 = Flux.just(" world");\n    Flux.zip(flux1, flux2, flux3)\n            .map(tuple3 -> tuple3.getT1().concat(tuple3.getT2()).concat(tuple3.getT3()))\n            .map(String::toUpperCase)\n            .subscribe(value -> System.out.println("zip result:" + value));\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可以在此处查看有关反应式技术的更多信息https://github.com/politrons/reactive

\n