如何过滤通量中具有最高值的所有元素

jim*_*arn 5 spring project-reactor

在事先不知道最高值的情况下,如何筛选发布者以获取具有最高值的元素?

这是一个小测试来说明我想要实现的目标:

@Test
fun filterForHighestValuesTest() {
    val numbers = Flux.just(1, 5, 7, 2, 8, 3, 8, 4, 3)
        // what operators to apply to numbers to make the test pass?

    StepVerifier.create(numbers)
        .expectNext(8)
        .expectNext(8)
        .verifyComplete()
}
Run Code Online (Sandbox Code Playgroud)

我从reduce 运算符开始:

@Test
fun filterForHighestValuesTestWithReduce() {

    val numbers = Flux.just(1, 5, 7, 2, 8, 3, 8, 4, 3)
        .reduce { a: Int, b: Int -> if( a > b) a else b }

    StepVerifier.create(numbers)
        .expectNext(8)
        .verifyComplete()
}
Run Code Online (Sandbox Code Playgroud)

当然,该测试通过了,但只会发出一个Mono,而我想获得Flux包含具有最高值的所有元素,例如这个简单示例中的 8 和 8 。

Sim*_*slé 1

首先,您需要为此提供状态,因此您需要小心每个订阅的状态。确保组合运算符时的一种方法是使用compose.

建议的解决方案

Flux<Integer> allMatchingHighest = numbers.compose(f -> {
        AtomicInteger highestSoFarState = new AtomicInteger(Integer.MIN_VALUE);
        AtomicInteger windowState = new AtomicInteger(Integer.MIN_VALUE);

        return f.filter(v -> {
            int highestSoFar = highestSoFarState.get();
            if (v > highestSoFar) {
                highestSoFarState.set(v);
                return true;
            }
            if (v == highestSoFar) {
                return true;
            }
            return false;
        })
                .bufferUntil(i -> i != windowState.getAndSet(i), true)
                .log()
                .takeLast(1)
                .flatMapIterable(Function.identity());
    });
Run Code Online (Sandbox Code Playgroud)

请注意,整个composelamdba 可以提取到一个方法中,使代码使用方法引用并且更具可读性。

说明

该解决方案分 4 个步骤完成,前两个步骤各有自己的AtomicInteger状态:

  1. 逐步找到新的“最高”元素(到目前为止)并filter找出较小的元素。这会导致Flux<Integer>(单调)增加的数字,例如1 5 7 8 8
  2. buffer由相同数量的块组成。我们使用orbufferUntil代替,因为最退化的情况是数字都不同,并且已经排序的结果会失败window*groupBy
  3. takeLast(1)跳过除一个 ( )之外的所有缓冲区
  4. “重放”最后一个缓冲区,它表示最高值出现的次数 ( flatMapIterable)

StepVerifier通过发出正确地通过了您的测试8 8。请注意,发出的中间缓冲区是:

onNext([1])
onNext([5])
onNext([7, 7, 7])
onNext([8, 8])
Run Code Online (Sandbox Code Playgroud)

更先进的测试,证明bufferUntil

一个更复杂的来源会失败,groupBy但不是这个解决方案:

Random rng = new Random();
//generate 258 numbers, each randomly repeated 1 to 10 times
//also, shuffle the whole thing
Flux<Integer> numbers = Flux
        .range(1, 258)
        .flatMap(i -> Mono.just(i).repeat(rng.nextInt(10)))
        .collectList()
        .map(l -> {
            Collections.shuffle(l);
            System.out.println(l);
            return l;
        })
        .flatMapIterable(Function.identity())
        .hide();
Run Code Online (Sandbox Code Playgroud)

这是它可以过滤到的缓冲区序列的一个示例(请记住仅重播最后一个缓冲区):

onNext([192])
onNext([245])
onNext([250])
onNext([256, 256])
onNext([257])
onNext([258, 258, 258, 258, 258, 258, 258, 258, 258])
onComplete()
Run Code Online (Sandbox Code Playgroud)

注意:如果删除该map随机播放,则会出现“退化情况”,甚至windowUntil无法正常工作(这takeLast将导致太多打开但未使用的窗口)。

这是一个有趣的想法!