在反应流上递归加载和映射

dtr*_*unk 6 java project-reactor spring-webflux

假设我们有一个商店、一个目录和类别。每个商店都可以有目录,每个类别可以有子类别。

这些文件来自 MongoDB:

@Data
@Document
@NoArgsConstructor
@AllArgsConstructor
public class Catalog {
    @Id
    private String id;
    private Set<String> stores;
    @Transient
    private List<Category> tree;
}

@Data
@Document
@NoArgsConstructor
@AllArgsConstructor
public class Category {
    @Id
    private String id;
    private List<CatalogMember> catalogs;
    private List<CategoryMember> children;
    @Transient
    private List<Category> subCategories;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CatalogMember {
    private String id;
    private Long sequence;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CategoryMember {
    private String id;
    private Long sequence;
}
Run Code Online (Sandbox Code Playgroud)

现在我有其他 API 模型可以在前端进一步使用:

@Getter
@Builder
public class CatalogByStoreIdResponse {
    private final String id;
    private final List<CategoryResponse> tree;
}

@Getter
@Builder
public class CategoryResponse {
    private final String id;
    private final List<CategoryResponse> children;
}
Run Code Online (Sandbox Code Playgroud)

目前我有以下代码来检索所有文档和设置Catalog::tree以及Category::subCategories

@Slf4j
@Component
@RequiredArgsConstructor
public class CatalogByStoreIdHandler implements HandlerFunction<ServerResponse> {
    private final CatalogReactiveRepository catalogReactiveRepository;
    private final CategoryReactiveRepository categoryReactiveRepository;
    private final CatalogByStoreIdResponseMapper catalogByStoreIdResponseMapper;

    @Override
    public Mono<ServerResponse> handle(ServerRequest request) {
        String storeId = request.pathVariable("storeId");

        return ok()
                .body(catalogReactiveRepository.findFirstByStoresContains(storeId)
                        .doOnNext(catalog -> categoryReactiveRepository.findAllByCatalogsIdContainsOrderByCatalogsSequenceAsc(catalog.getId())
                                .collectList()
                                .doOnNext(tree -> catalog.setTree(tree))
                                .flatMapMany(Flux::fromIterable)
                                .expandDeep(category -> Flux.fromIterable(category.getChildren())
                                        .map(CategoryMember::getId)
                                        .collectList()
                                        .flatMapMany(categoryReactiveRepository::findAllByIdIn)
                                        .collectList()
                                        .doOnNext(subCategories -> category.setSubCategories(subCategories))
                                        .flatMapMany(Flux::fromIterable))
                                .subscribe())
                        .map(catalogByStoreIdResponseMapper), CatalogByStoreIdResponse.class);
    }
}
Run Code Online (Sandbox Code Playgroud)

这是基本的映射器:

@Component
@RequiredArgsConstructor
public class CatalogByStoreIdResponseMapper implements Function<Catalog, CatalogByStoreIdResponse> {
    private final CategoryResponseMapper categoryResponseMapper;

    @Override
    public CatalogByStoreIdResponse apply(Catalog catalog) {
        return CatalogByStoreIdResponse.builder()
                .id(catalog.getId())
                .tree(Stream.ofNullable(catalog.getTree()) // catalog.getTree() is always null
                        .flatMap(List::stream)
                        .map(categoryResponseMapper)
                        .collect(Collectors.toList()))
                .build();
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我的问题是映射函数在实际设置树之前被调用,因此我的响应中的树是空的。

我知道这是反应式编程中的正确行为,但我需要知道如何解决我的问题,以便我的树字段不为空。

dtr*_*unk 0

通过以下代码,我得到了我需要的结果:

return ok()
        .body(catalogReactiveRepository.findFirstByStoresContains(storeId)
                .zipWhen(catalog -> categoryReactiveRepository.findAllByCatalogsIdContainsOrderByCatalogsSequenceAsc(catalog.getId())
                        .collectList()
                        .doOnNext(catalog::setTree)
                        .flatMapMany(Flux::fromIterable)
                        .expand(category -> Flux.fromIterable(Optional.ofNullable(category.getChildren()).orElse(Collections.emptyList()))
                                .map(CategoryMember::getId)
                                .collectList()
                                .flatMapMany(categoryReactiveRepository::findAllByIdIn)
                                .collectList()
                                .doOnNext(category::setSubCategories)
                                .flatMapMany(Flux::fromIterable))
                        .collectList())
                .map(Tuple2::getT1)
                .map(catalogByStoreIdResponseMapper), CatalogByStoreIdResponse.class);
Run Code Online (Sandbox Code Playgroud)

结果:

{

    "id": "DEMO_CATALOG",
    "tree": [
        {
            "id": "DEMO_CATEGORY_1",
            "children": [
                {
                    "id": "DEMO_CATEGORY_1_1",
                    "children": [ ]
                }
            ]
        },
        {
            "id": "DEMO_CATEGORY_2",
            "children": [
                {
                    "id": "DEMO_CATEGORY_2_1",
                    "children": [ ]
                }
            ]
        }
    ]

}
Run Code Online (Sandbox Code Playgroud)

如果有改进的空间,请告诉我。