Mar*_*unt 5 java reactive-programming project-reactor spring-webflux
I'm attempting to achieve the following in the method below
- Get All Cars from Dealer X
- Create wrapper object that stores a set of all cars and another set of all manufactures 2a. Populate Cars set with the cars obtained in Step 1
- 对于每辆汽车,获得他们所有的独立制造商
- 将所有获得的制造商存储到包装对象制造商集合中
- 返回汽车和制造商的单声道
Mono<CarAndManufactures> requestCarAndManufactures(Request req) {
final String dealerId = buildDealerId(req.getDealerRegion(), req.getDealerId());
final CarAndManufactures CarAndManufactures = new CarAndManufactures();
return webSocketClient.getCars(dealerId) //note #getCars returns a Mono
.map(getCarsResponse -> getCarsResponse
.getResult()
.stream()
.map(Car::getId)
.collect(toSet()))
.map(carIds -> {
CarAndManufactures.setCars(carIds);
return CarAndManufactures;
})
.flatMapMany(CarAndManufactures1 -> Flux.fromIterable(CarAndManufactures.getCars().keySet()))
.collectList()
.log("Existing cars")
.flatMap(carIds -> { //This is the problem area
carIds
.stream()
.map(carId -> {
webSocketClient.getManufactures(carId) //Note getManufactures returns a Mono... This method does look like its ever called
.map(getManufactureResponse -> getManufactureResponse
.getResult()
.stream()
.map(Manufacture::getId)
.collect(toSet()))
.map(ManufactureIds -> {
CarAndManufactures.SetManufactures(ManufactureIds); //since the line commented on above is not called the Manufacture Set is empty
return CarAndManufactures;
});
return CarAndManufactures;
});
return just(CarAndManufactures);
}
)
.log("Car And Manufactures");
Run Code Online (Sandbox Code Playgroud)
}
制造商集总是空的,看起来不像 webSocketClient.getManufactures(carId) 被调用过。以为我可能会在某些地方丢失 .subscribe ,但由于 webflux 控制器正在使用它,我认为任何地方都不需要 #subscribes
我知道我已经太晚了,但也许有人会找到这个问题并看到答案,我希望这会有所帮助。
webSocketClient.getManufactures(carId代码中从未调用 ) 的原因很明显:自从您将此调用置于.map()Stream API 的简单运算符以来,没有人订阅此发布者。
整个反应链应该是整体的,没有任何“中断”。获取汽车的 id 并将它们放入 后.flatMap(),.flatMap()您应该在其中声明另一个Fluxid,然后在其中Flux放置另一个.flatMap()来请求制造商,最后将它们收集到结果集中。
另外,您不应该声明final CarAndManufactures CarAndManufactures = new CarAndManufactures();隐式,然后尝试在反应链中填充该对象。这样做会将命令式代码与反应式代码混合在一起,这是使用反应式的错误方法。
你应该一直留在你的反应链中。
所以,实现你的方法如下:(我还使代码“更好”和更干净):
@Test
public void carsAndManufacturesTest() {
String dealerId = "test dealer";
client.getCars(dealerId) // request Mono<CarsResponse>
.map(CarsResponse::getResult) // getting List<Cars> from response
.map(cars -> // getting ids of cars and collect them to Set<String>
cars.stream()
.map(Car::getId)
.collect(Collectors.toSet())
)
.flatMap(carsIds -> // creating another publisher that will fetch manufactures and build the CarAndManufactures
this.fetchManufacturesIds(carsIds) // fetching Set<String> manufactures for all the carsIds
.map(manufacturesIds -> // we're done here! we have both carsIds and manufactureIds!
new CarAndManufactures( // creating desired object from both carsIds and their manufacturesIds
carsIds,
manufacturesIds)
)
)
.doOnNext(carAndManufactures -> log.info(carAndManufactures.toString()))
.subscribe();
}
/**
* Fetches all the manufactures ids of given cars ids
*
* @param carsIds ids of cars
* @return Mono with all the manufactures ids of given cars ids
*/
public Mono<Set<String>> fetchManufacturesIds(Set<String> carsIds) {
return Flux.fromIterable(carsIds) // creating flux of given cars ids
.flatMap(client::getManufactures) // request Mono<ManufactureResponse> for car id
.map(ManufactureResponse::getResult) // getting List<Manufacture>
.map(manufactures -> // getting ids of manufactures and collect them to Set
manufactures.stream()
.map(Manufacture::getId)
.collect(Collectors.toSet())
)
.collectList() // collecting all the sets of manufactures ids, here we get List<Set<String>>
.map(list -> // flatting all the sets to one set, eventually we get here Set<String>
list.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet())
);
}
Run Code Online (Sandbox Code Playgroud)
对于那些想要测试此代码的人,我将在这里留下类的规范。然后在上面的代码中创建客户端private Client client = new Client();(作为放置这两个 Merhod 的类的字段)
课程:
/*
================== Classes examples for those who wants to test this code ===================
*/
@Slf4j
class Client {
public Mono<CarsResponse> getCars(String dealerId) {
log.info("Received request to fetch cars by dealer id: {}", dealerId);
List<Car> cars =
List.of(new Car("MUSTANG"),
new Car("FOCUS"),
new Car("FUSION")
);
return Mono.just(new CarsResponse(cars));
}
public Mono<ManufactureResponse> getManufactures(String carId) {
log.info("Received request to fetch manufactures by car id: {}", carId);
List<Manufacture> manufactures =
List.of(new Manufacture("MF BUFFALO"),
new Manufacture("MF CHANGAN"),
new Manufacture("MF CHICAGO"),
new Manufacture("MF DEARBORN")
);
return Mono.just(new ManufactureResponse(manufactures));
}
}
/*
================== Data classes ===================
*/
@Data
@AllArgsConstructor
class CarsResponse {
private List<Car> result;
}
@Data
@AllArgsConstructor
class ManufactureResponse {
List<Manufacture> result;
}
@Data
@AllArgsConstructor
class CarAndManufactures {
private Set<String> cars;
private Set<String> manufactures;
}
@Data
@AllArgsConstructor
class Car {
private String id;
}
@Data
@AllArgsConstructor
class Manufacture {
private String id;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3975 次 |
| 最近记录: |