Mat*_*ley 2 java reactive-programming project-reactor spring-webflux
我使用 Spring WebFlux(Project Reactor),但面临以下问题:我必须从 db 获取一些数据才能使用它们来调用另一项服务 - 一个流中的所有内容。怎么做?
public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
                    .map(this::createSpecificObject))
        .doOnNext(item-> createObjAndCallAnotherService(item));
  }
private void createObjAndCallAnotherService(Prot prot){
myRepository
        .findById(
            prot.getDomCred().stream()
                .filter(Objects::nonNull)
                .findFirst()
                .map(ConfDomCred::getCredId)
                .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")))
        .doOnNext( //one value is returned from myRepository -> Flux<MyObjectWithNeededData>
            confCred-> {//from this point the code is unreachable!!! - why????
              Optional<ConfDomCred> confDomCred=
                  prot.getDomCreds().stream().filter(Objects::nonNull).findFirst();
              confDomCred.ifPresent(
                  domCred -> {
                    ProtComDto com=
                        ProtComDto.builder()
                            .userName(confCred.getUsername())
                            .password(confCred.getPassword())                          
                            .build();
                    clientApiToAnotherService.callEndpintInAnotherService(com); //this is a client like Feign that invokes method in another service
                  });
            });
}
更新
当我调用
    Flux<MyObj> myFlux =  myRepository
            .findById(
                prot.getDomCred().stream()
                    .filter(Objects::nonNull)
                    .findFirst()
                    .map(ConfDomCred::getCredId)
                    .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")));
myFlux.subscribe(e -> e.getPassword()) 
然后打印值
更新2
所以作为一个回顾 - 我认为下面的代码是异步/非阻塞的 - 我对吗?在我的
保护命令服务
我不得不使用 subscribe() 两次 - 只有这样我才能调用我的其他服务并将它们存储为我的对象: commandControllerApi.createNewCommand
public Mono<Protection> saveProtection(Mono<Protection> newProtection) {
    return newProtection.flatMap(
        protection ->
            Mono.zip(
                    protectorRepository.save(//some code),
                    domainCredentialRepository
                        .saveAll(//some code)
                        .collectList(),
                    protectionSetRepository
                        .saveAll(//some code)
                        .collectList())
                .map(this::createNewObjectWrapper)
                .doOnNext(protectionCommandService::createProtectionCommand));
  }
ProtectionCommandService 类:
public class ProtectionCommandService {
  private final ProtectionCommandStrategyFactory protectionCommandFactory;
  private final CommandControllerApi commandControllerApi;
  public Mono<ProtectionObjectsWrapper> createProtectionCommand(
      ProtectionObjectsWrapper protection) {
    ProductType productType = protection.getProtector().getProductType();
    Optional<ProtectionCommandFactory> commandFactory = protectionCommandFactory.get(productType);
    commandFactory
        .get()
        .createCommandFromProtection(protection)
        .subscribe(command -> commandControllerApi.createNewCommand(command).subscribe());
    return Mono.just(protection);
  }
}
以及 2 家工厂之一:
@Component
@AllArgsConstructor
@Slf4j
public class VmWareProtectionCommandFactory implements ProtectionCommandFactory {
  private static final Map<ProductType, CommandTypeEnum> productTypeToCommandType =
      ImmutableMap.of(...//some values);
  private final ConfigurationCredentialRepository configurationCredentialRepository;
  @Override
  public Mono<CommandDetails> createCommandFromProtection(ProtectionObjectsWrapper protection) {
    Optional<DomainCredential> domainCredential =
        protection.getDomainCredentials().stream().findFirst();
    return configurationCredentialRepository
        .findByOwnerAndId(protection.getOwner(), domainCredential.get().getCredentialId())
        .map(credential -> createCommand(protection, credential, domainCredential.get()));
  }
并且 createCommand 方法作为这个工厂的结果返回 Mono 对象。
private Mono<CommandDetails> createCommand(Protection protection
     //other parameters) {
    CommandDto commandDto =
        buildCommandDto(protection, confCredential, domainCredentials);
    String commands = JsonUtils.toJson(commandDto);
    CommandDetails details = new CommandDetails();
    details.setAgentId(protection.getProtector().getAgentId().toString());
    details.setCommandType(///some value);
    details.setArguments(//some value);
    return Mono.just(details);
更新3
我调用一切的主要方法已经改变了一点:
public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
.map(this::wrapIntoAnotherObject)
.flatMap(protectionCommandService::createProtectionCommand)
.map(this::createMyObj));
停止打破链条
这是一个纯函数,它返回一些东西,无论我们给它什么,它总是返回相同的东西。它没有副作用。
public Mono<Integer> fooBar(int number) {
    return Mono.just(number);
}
我们可以调用它并链接起来,因为它会返回一些东西。
foobar(5).flatMap(number -> { ... }).subscribe();
这是一个非纯函数,我们不能上链,我们正在打破链。我们无法订阅,在我们订阅之前什么也不会发生。
public void fooBar(int number) {
    Mono.just(number)
}
fooBar(5).subscribe(); // compiler error
但我想要一个空函数,我想要,我想要我想要.... wuuaaa wuaaaa
我们总是需要返回一些东西,以便我们可以触发链中的下一部分。程序如何知道何时运行下一部分?但是假设我们想忽略返回值而只触发下一部分。那么我们可以返回一个Mono<Void>.
public Mono<Void> fooBar(int number) {
    System.out.println("Number: " + number);
    return Mono.empty();
}
foobar(5).subscribe(); // Will work we have not broken the chain
你的例子:
private void createObjAndCallAnotherService(Prot prot){
    myRepository.findById( ... ) // breaking the chain, no return
}
以及其他一些提示:
MyObj和saveObj,myRepositorycreateObjAndCallAnotherServicecreateObjAndCallAnotherService这是做两件事,因此得名。更新
你还在犯同样的错误。
commandFactory // Here you are breaking the chain because you are ignoring the return type
    .get()
    .createCommandFromProtection(protection)
    .subscribe(command -> commandControllerApi.createNewCommand(command)
.subscribe()); // DONT SUBSCRIBE you are not the consumer, the client that initiated the call is the subscriber
return Mono.just(protection);
你想要做的是:
return commandFactory.get()
    .createCommandFrom(protection)
    .flatMap(command -> commandControllerApi.createNewCommand(command))
    .thenReturn(protection);
停止破坏链条,除非您的服务是最终消费者或发起呼叫的人,否则不要订阅。
| 归档时间: | 
 | 
| 查看次数: | 1529 次 | 
| 最近记录: |