如果满足条件,Reactor如何重复某个步骤n次

Ksu*_*Ksu 5 project-reactor

请帮助我使用反应器,我需要最多检查一个条件 n 次并返回最终结果

我发现reactor有reactor-extra模块

https://projectreactor.io/docs/extra/snapshot/api/reactor/retry/Repeat.html

它具有构造 Repeat.create(java.util.function.Predicate<? super RepeatContext<T>> predicate, long n) Repeat 函数,仅当谓词返回 true 时才重复 n 次。

这看起来是正确的解决方案,但我不明白
我想重复的操作应该在哪里?我有很多动作的 Flux,但我只想重复一个

请举一个代码示例

谢谢

private int culculateNextResult(some params) {
          // some implementation  
 }



private Boolean compareResults(int prevRes, int nextRes) {
          // some implementation
 }

 public Flux<Boolean> run(some params, Flux<Integer> prevResults){

      return prevResults.map(elem -> compareResults(elem, culculateNextResult(some params)));

 // THIS LOGIC SHOULD BE REPEATED N times if compareResults(elem,       
 // culculateNextResult(some params))) == false, if true, we don't need 
// to repeat 
     }
Run Code Online (Sandbox Code Playgroud)

我想重复compareResults(elem, culculateNextResult(some params))) 直到它不成立。但 n 倍最大值并返回 Flux 作为结果

Phi*_*lay 6

Flux.repeat并将Mono.repeat重新订阅源,因此源的每个先前步骤都将通过新订阅重复。

由于calculateNextResultcompareResults在您的示例中都是同步操作,因此您可以使用一个简单的for循环来重复...

    public Flux<Boolean> run(some params, Flux<Integer> prevResults){
        return prevResults.map(elem -> {
            for (int i = 0; i < 5; i++) {
                if (compareResults(elem, calculateNextResult(some params))) {
                    return true;
                }
            }
            return false;
        });
    }
Run Code Online (Sandbox Code Playgroud)

如果calculateNextResult反应compareResults式方法返回Mono,那么您可以使用flatMap代替map,并使用其中一种Mono.repeat*方法。

例如,这样的事情:

    private Mono<Integer> calculateNextResult(some params) {
        // some implementation
    }

    private Mono<Boolean> compareResults(int prevRes, int nextRes) {
        // some implementation
    }
    public Flux<Boolean> run(some params, Flux<Integer> prevResults){

        return prevResults.flatMap(prevResult -> 

            calculateNextResult(some params)
                    .flatMap(nextResult -> compareResults(prevResult, nextResult))
                    .filter(comparisonResult -> comparisonResult)
                    .repeatWhenEmpty(Repeat.times(5))
                    .defaultIfEmpty(false));
    }

Run Code Online (Sandbox Code Playgroud)

在此示例中,repeatWhenEmpty将导致对在 flatMap 中创建的 Mono 进行新订阅,这将导致calculateNextResult重新计算(假设返回的 MonocalculateNextResult设置为计算每个订阅的值)。