How to iterate over Mono<List<String>> calling Mono returning method

Mar*_*unt 5 java reactive-programming project-reactor spring-webflux

I'm attempting to achieve the following in the method below

  1. Get All Cars from Dealer X
  2. Create wrapper object that stores a set of all cars and another set of all manufactures 2a. Populate Cars set with the cars obtained in Step 1
  3. 对于每辆汽车,获得他们所有的独立制造商
  4. 将所有获得的制造商存储到包装对象制造商集合中
  5. 返回汽车和制造商的单声道
Mono<CarAndManufactures> requestCarAndManufactures(Request req) {
final String dealerId = buildDealerId(req.getDealerRegion(), req.getDealerId());
final CarAndManufactures CarAndManufactures = new CarAndManufactures();
return webSocketClient.getCars(dealerId) //note #getCars returns a Mono
    .map(getCarsResponse -> getCarsResponse
      .getResult()
      .stream()
      .map(Car::getId)
      .collect(toSet()))
    .map(carIds -> {
      CarAndManufactures.setCars(carIds);
      return CarAndManufactures;
    })
    .flatMapMany(CarAndManufactures1 -> Flux.fromIterable(CarAndManufactures.getCars().keySet()))
    .collectList()
    .log("Existing cars")
    .flatMap(carIds -> { //This is the problem area
      carIds
          .stream()
          .map(carId -> {
            webSocketClient.getManufactures(carId) //Note getManufactures returns a Mono... This method does look like its ever called
                .map(getManufactureResponse -> getManufactureResponse
                    .getResult()
                    .stream()
                    .map(Manufacture::getId)
                    .collect(toSet()))
                .map(ManufactureIds -> {
                  CarAndManufactures.SetManufactures(ManufactureIds); //since the line commented on above is not called the Manufacture Set is empty
                  return CarAndManufactures;
                });
            return CarAndManufactures;
          });
          return just(CarAndManufactures);
        }
    )
    .log("Car And Manufactures"); 
Run Code Online (Sandbox Code Playgroud)

}

制造商集总是空的,看起来不像 webSocketClient.getManufactures(carId) 被调用过。以为我可能会在某些地方丢失 .subscribe ,但由于 webflux 控制器正在使用它,我认为任何地方都不需要 #subscribes

ker*_*ter 0

我知道我已经太晚了,但也许有人会找到这个问题并看到答案,我希望这会有所帮助。

webSocketClient.getManufactures(carId代码中从未调用 ) 的原因很明显:自从您将此调用置于.map()Stream API 的简单运算符以来,没有人订阅此发布者。

整个反应链应该是整体的,没有任何“中断”。获取汽车的 id 并将它们放入 后.flatMap().flatMap()您应该在其中声明另一个Fluxid,然后在其中Flux放置另一个.flatMap()来请求制造商,最后将它们收集到结果集中。

另外,您不应该声明final CarAndManufactures CarAndManufactures = new CarAndManufactures();隐式,然后尝试在反应链中填充该对象。这样做会将命令式代码与反应式代码混合在一起,这是使用反应式的错误方法。

你应该一直留在你的反应链中。

所以,实现你的方法如下:(我还使代码“更好”和更干净):

@Test
public void carsAndManufacturesTest() {
    String dealerId = "test dealer";

    client.getCars(dealerId)                                  // request Mono<CarsResponse>
            .map(CarsResponse::getResult)                     // getting List<Cars> from response
            .map(cars ->                                      // getting ids of cars and collect them to Set<String>
                    cars.stream()
                            .map(Car::getId)
                            .collect(Collectors.toSet())
            )
            .flatMap(carsIds ->                               // creating another publisher that will fetch manufactures and build the CarAndManufactures
                    this.fetchManufacturesIds(carsIds)        // fetching Set<String> manufactures for all the carsIds
                            .map(manufacturesIds ->           // we're done here! we have both carsIds and manufactureIds!
                                    new CarAndManufactures(   // creating desired object from both carsIds and their manufacturesIds
                                            carsIds,
                                            manufacturesIds)
                            )
            )
            .doOnNext(carAndManufactures -> log.info(carAndManufactures.toString()))
            .subscribe();
}

/**
 * Fetches all the manufactures ids of given cars ids
 *
 * @param carsIds ids of cars
 * @return Mono with all the manufactures ids of given cars ids
 */
public Mono<Set<String>> fetchManufacturesIds(Set<String> carsIds) {

    return Flux.fromIterable(carsIds)                       // creating flux of given cars ids
            .flatMap(client::getManufactures)               // request Mono<ManufactureResponse> for car id
            .map(ManufactureResponse::getResult)            // getting List<Manufacture>
            .map(manufactures ->                            // getting ids of manufactures and collect them to Set
                    manufactures.stream()
                            .map(Manufacture::getId)
                            .collect(Collectors.toSet())
            )
            .collectList()                                  // collecting all the sets of manufactures ids, here we get List<Set<String>>
            .map(list ->                                    // flatting all the sets to one set, eventually we get here Set<String>
                    list.stream()
                            .flatMap(Collection::stream)
                            .collect(Collectors.toSet())
            );
}
Run Code Online (Sandbox Code Playgroud)

对于那些想要测试此代码的人,我将在这里留下类的规范。然后在上面的代码中创建客户端private Client client = new Client();(作为放置这两个 Merhod 的类的字段)

课程:

/*
    ================== Classes examples for those who wants to test this code ===================
 */
@Slf4j
class Client {

    public Mono<CarsResponse> getCars(String dealerId) {
        log.info("Received request to fetch cars by dealer id: {}", dealerId);

        List<Car> cars =
                List.of(new Car("MUSTANG"),
                        new Car("FOCUS"),
                        new Car("FUSION")
                );
        return Mono.just(new CarsResponse(cars));
    }

    public Mono<ManufactureResponse> getManufactures(String carId) {
        log.info("Received request to fetch manufactures by car id: {}", carId);

        List<Manufacture> manufactures =
                List.of(new Manufacture("MF BUFFALO"),
                        new Manufacture("MF CHANGAN"),
                        new Manufacture("MF CHICAGO"),
                        new Manufacture("MF DEARBORN")
                );
        return Mono.just(new ManufactureResponse(manufactures));
    }
}

/*
    ================== Data classes ===================
 */
@Data
@AllArgsConstructor
class CarsResponse {

    private List<Car> result;
}

@Data
@AllArgsConstructor
class ManufactureResponse {

    List<Manufacture> result;
}

@Data
@AllArgsConstructor
class CarAndManufactures {

    private Set<String> cars;
    private Set<String> manufactures;
}

@Data
@AllArgsConstructor
class Car {

    private String id;
}

@Data
@AllArgsConstructor
class Manufacture {

    private String id;
}
Run Code Online (Sandbox Code Playgroud)