基于reactor中的条件检查的异步顺序调用

Vij*_*dey 8 java reactor reactive-programming java-8 spring-boot

在这里,我试图使用reactor进行异步和非阻塞调用,对于每个请求,我可能必须按顺序调用两个服务(在我的情况下,下面,getAccountInfoFromAAAgetAccountInfoFromBBB).

这是我的ItemRequest对象:

public class ItemRequest {
    private Account account;
    private Result firstServiceResult;
    private Result secondServiceResult;
    private PostingParameterCode postingParameterCode; //enum 
    //...
    //...
    //getters and setters
}
Run Code Online (Sandbox Code Playgroud)

所以,我的请求输入将包含多个itemRequests,对于每个itemRequest,我正在进行异步调用:

public void getAccountData(List<ItemRequest> itemRequests) {
    ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
    Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}

public Mono<ItemRequest> callBothSors(ItemRequest itemRequest) {
    return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest); 
    //here, it will enter into a sequential call for each itemRequest
}
Run Code Online (Sandbox Code Playgroud)

这是我的第一个服务调用界面:

public Mono<ItemRequest> getAccountDataFromAAA(ItemRequest itemRequest);
Run Code Online (Sandbox Code Playgroud)

这是我的第二个服务调用界面:

public Mono<ItemRequest> getAccountDataFromBBB(ItemRequest itemRequest);
Run Code Online (Sandbox Code Playgroud)

此方法将根据条件按顺序最多进行两次调用:

public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<ItemRequest> firstCallResult = Mono.empty();
    Mono<ItemRequest> secondCallResult = Mono.empty();

if(isFirstServiceCallRequired(itemRequest)){
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide 
//whether to make secondService call.
} else {
    //Account key is already present, so just update the result status which I need later.
    Result result = new Result();
    result.setStatus(Result.Status.OK);
    result.setMessageText("First call not required as account info is set for item request");
    itemRequest.setFirstServiceResult(result);
}

//Now, before calling the second service, I need to check the following:

if(null!= itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)){ 
        secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
    }

    return firstCallResult.then(secondCallResult);  //attaching the
    //firstCallResult and secondCallResult to produce a single Mono
Run Code Online (Sandbox Code Playgroud)

}

firstCallResult不需要时,这工作正常.但是当需要第一次调用时,这个条件检查不会通过,因为我没有更新第一个调用结果对象:

if(null != itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT))) { ... } 
 //this condition check will not pass because first service call is not actually executing
Run Code Online (Sandbox Code Playgroud)

如果我提出以下声明,两种情况都可以正常工作:

if(isFirstServiceCallRequired(itemRequest)){
        firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
        firstCallResult.block(); //adding this case will work on both cases 
    }
Run Code Online (Sandbox Code Playgroud)

但是,我认为我不会让这些反应堆受益.我想要有这样的逻辑:

Mono<ItemRequest> result = firstService.call(...)
    .doOnNext(/*do something */)
    .then( ... secondService.call())
Run Code Online (Sandbox Code Playgroud)

但无法弄清楚用firstService链接secondService的方法来获得单声道结果​​并进行那些条件检查.条件检查很重要,因为我并不总是想要执行第二项服务.有没有办法用firstService链接secondService以获得结果并进行那些条件检查?

为长期问题道歉.任何建议/帮助将不胜感激.

Vij*_*dey 5

在为这个问题提供奖励积分后,我真的很兴奋并期待一些答案。但无论如何,我能够改进我的初始解决方案并进行这些条件检查。

我做了以下事情:我在两个服务调用中都将返回类型从 更改Mono<ItemRequest>Mono<Void>,因为我基本上是将数据更新为ItemRequest列表:

这里处理并行调用(每个并行调用都有一个顺序调用):

public void getAccountData(List<ItemRequest> itemRequests) {
        ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
        Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
    }

    public Mono<Void> callBothSors(ItemRequest itemRequest) {
        return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
        //here, it will enter into a sequential call for each itemRequest
    }
Run Code Online (Sandbox Code Playgroud)

这些是我firstServiceCallsecondServiceCall界面的变化:

public Mono<Void> getAccountDataFromAAA(ItemRequest itemRequest);

public Mono<Void> getAccountDataFromBBB(ItemRequest itemRequest);
Run Code Online (Sandbox Code Playgroud)

我将secondServiceCallwith链接起来firstServiceCall以获得单声道结果​​并进行这些条件检查:

public Mono<Void> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<Void> callSequence = Mono.empty();

    if(isFirstServiceCallRequired(itemRequest)){
        callSequence = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    } else {
        //Account key is already present, so just update the result status which I need later.
        Result result = new Result();
        result.setStatus(Result.Status.OK);
        result.setMessageText("First call not required as account info is set for item request");
        itemRequest.setFirstServiceResult(result);
    }

    return callSequence.thenEmpty(Mono.defer(() -> {
        //note: Mono.defer ==>> Create a Mono provider that will supply a target Mono to subscribe to 
        //for each subscriber downstream.
        //only if the firstServiceCall result is successful & other condition check successful,
        // I am calling secondServiceCall:  
        if(shouldCallSecondService(itemRequest)){
            return this.secondServiceCallImpl.getAccountDataFromAAAandBBB(itemRequest);
        } else {
            return Mono.empty();
        }
    }))
Run Code Online (Sandbox Code Playgroud)