反应式编程:Spring WebFlux:如何构建微服务调用链?

use*_*011 3 reactive-programming spring-boot reactive spring-webflux spring-webclient

Spring Boot应用:

a@RestController接收以下有效负载:

{
  "cartoon": "The Little Mermaid",
  "characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}
Run Code Online (Sandbox Code Playgroud)

我需要按以下方式处理它:

  1. 获取每个角色名称的唯一 ID:对“卡通人物”微服务进行 HTTP 调用,该服务按名称返回 id
  2. 转换控制器接收到的数据:将角色名称替换为上一步从“卡通人物”微服务接收到的适当 ID。 { "cartoon": "The Little Mermaid", "characterIds": [1, 2, 3, 4] }

  3. 使用转换后的数据向“cartoon-db”微服务发送 HTTP POST 请求。

  4. 将“cartoon-db”的响应映射到作为控制器返回值的内部表示。

我遇到的问题:

Reactive Programming我需要使用(非阻塞\异步处理)和Spring WebFluxMono| Flux)的范例来实现所有这些步骤Spring Reactive WebClient- 但我对该堆栈的经验为零,试图尽可能多地阅读它,再加上谷歌搜索很多但是仍然有很多未解答的问题,例如:

Q1 . 我已经配置了响应式 webClient,向“卡通人物”微服务发送请求:

      public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
    return WebClient.builder().baseUrl("http://cartoon-characters").build()
        .get()
        .uri("/character/{characterName}", characterName)
        .retrieve()
        .bodyToMono(Integer.class);
  }
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,我有一个卡通人物名称列表,对于每个角色我需要调用getCartoonCharacterIdbyName(String name)方法,我不确定串联调用它的正确选项,相信正确的选项:并行执行。

写了如下方法:

  public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
    .flatMap(this::getCartoonCharacterIdbyName);

return StreamSupport.stream(flux.toIterable().spliterator(), false)
    .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

}

但我怀疑这段代码是否并行WebClient执行,并且代码调用flux.toIterable()会阻塞线程,因此通过此实现,我失去了非阻塞机制。

我的假设正确吗?

我需要如何重写它才能具有并行性和非阻塞性?

Q2。 从技术上讲,是否可以以响应式方式转换控制器接收到的输入数据(我的意思是用 ids 替换名称):当我们使用Flux<Integer>characterIds 进行操作时,而不是使用List<Integer>characterIds 进行操作时?

Q3。是否有可能不仅获得转换后的 Data 对象,而且在步骤 2之后获得 Mono<> ,并可以在步骤 3中被另一个 WebClient 使用?

K.N*_*las 6

实际上这是一个很好的问题,因为理解 WebFlux 或项目反应器框架在链接微服务时需要几个步骤。

首先是认识到 aWebClient应该接收一个发布者并返回一个发布者。将其推断为 4 个不同的方法签名以帮助思考。

  • 单声道 -> 单声道
  • 助焊剂 -> 助焊剂
  • 单声道 -> 助焊剂
  • 助焊剂 -> 单声道

当然,在所有情况下,它都只是发布者->发布者,但在您更好地理解事情之前先保留它。前两个是显而易见的,您只需用来.map(...)处理流中的对象,但您需要学习如何处理后两个。如上所述,从 Flux->Mono 可以使用.collectList(),也可以使用.reduce(...)。从 Mono->Flux 似乎通常是通过.flatMapMany.flatMapIterable或 的一些变体来完成的。可能还有其他技术。您永远不应该在任何 WebFlux 代码中使用.block(),如果尝试这样做,通常您会收到运行时错误。

在你的例子中你想去

  • (单声道->助焊剂)->(助焊剂->助焊剂)->(助焊剂->助焊剂)

正如你所说,你想要

  • 单声道->助焊剂->助焊剂

第二部分是了解链接流。你可以做

  • p3(p2(p1(对象)));

这会链接 p1->p2->p3,但我总是发现创建一个“服务层”更容易理解。

  • o2 = p1(对象);
  • o3 = p2(o2);
  • 结果 = p3(o3);

这段代码更容易阅读和维护,并且随着成熟度的提高,您会逐渐理解该语句的价值。

我对你的例子遇到的唯一问题是将 a Flux<String>withWebClient作为 a @RequestBody。不起作用。请参阅WebClient bodyToFlux(String.class) 了解字符串列表不分隔各个值。除此之外,这是一个非常简单的应用程序。你调试的时候会发现,它先到了.subscribe(System.out::println)line,再到line Flux<Integer> ids = mapNamesToIds(fn)。这是因为流程是在执行之前设置的。需要一段时间才能理解这一点,但这是项目反应器框架的重点。

@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    Map<Integer, CartoonCharacter> characters;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
        characters = Arrays.asList( new CartoonCharacter[] {
                new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"), 
                new CartoonCharacter(names[1].hashCode(), names[1], "Human"), 
                new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"), 
                new CartoonCharacter(names[3].hashCode(), names[3], "Fish")} 
        )
        .stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
        // TODO Auto-generated method stub
        CartoonRequest cr = CartoonRequest.builder()
        .cartoon("The Little Mermaid")
        .characterNames(Arrays.asList(names))
        .build();
        thisLocalClient
            .post()
            .uri("cartoonDetails")
            .body(Mono.just(cr), CartoonRequest.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class)
            .subscribe(System.out::println);
    }

    @Bean
    WebClient localClient() {
        return WebClient.create("http://localhost:8080/demo/");
    }

    @Autowired
    WebClient thisLocalClient;

    @PostMapping("cartoonDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
        Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
        Flux<Integer> ids = mapNamesToIds(fn);
        Flux<CartoonCharacter> details = mapIdsToDetails(ids);
        return details;
    }
    //  Service Layer Methods
    private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
        return thisLocalClient
            .post()
            .uri("findIds")
            .body(names, StringWrapper.class)
            .retrieve()
            .bodyToFlux(Integer.class);
    }
    private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
        return thisLocalClient
            .post()
            .uri("findDetails")
            .body(ids, Integer.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class);
    }
    // Services
    @PostMapping("findIds")
    Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
        return names.map(name->name.getString().hashCode());
    }
    @PostMapping("findDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
        return ids.map(characters::get);
    }
}
Run Code Online (Sandbox Code Playgroud)

还:

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
    private String string;
}
@Data
@Builder
public class CartoonRequest {
    private String cartoon;
    private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
    Integer id;
    String name;
    String species;
}
Run Code Online (Sandbox Code Playgroud)