jim*_*arn 5 spring project-reactor
在事先不知道最高值的情况下,如何筛选发布者以获取具有最高值的元素?
这是一个小测试来说明我想要实现的目标:
@Test
fun filterForHighestValuesTest() {
val numbers = Flux.just(1, 5, 7, 2, 8, 3, 8, 4, 3)
// what operators to apply to numbers to make the test pass?
StepVerifier.create(numbers)
.expectNext(8)
.expectNext(8)
.verifyComplete()
}
Run Code Online (Sandbox Code Playgroud)
我从reduce 运算符开始:
@Test
fun filterForHighestValuesTestWithReduce() {
val numbers = Flux.just(1, 5, 7, 2, 8, 3, 8, 4, 3)
.reduce { a: Int, b: Int -> if( a > b) a else b }
StepVerifier.create(numbers)
.expectNext(8)
.verifyComplete()
}
Run Code Online (Sandbox Code Playgroud)
当然,该测试通过了,但只会发出一个Mono,而我想获得Flux包含具有最高值的所有元素,例如这个简单示例中的 8 和 8 。
首先,您需要为此提供状态,因此您需要小心每个订阅的状态。确保组合运算符时的一种方法是使用compose.
Flux<Integer> allMatchingHighest = numbers.compose(f -> {
AtomicInteger highestSoFarState = new AtomicInteger(Integer.MIN_VALUE);
AtomicInteger windowState = new AtomicInteger(Integer.MIN_VALUE);
return f.filter(v -> {
int highestSoFar = highestSoFarState.get();
if (v > highestSoFar) {
highestSoFarState.set(v);
return true;
}
if (v == highestSoFar) {
return true;
}
return false;
})
.bufferUntil(i -> i != windowState.getAndSet(i), true)
.log()
.takeLast(1)
.flatMapIterable(Function.identity());
});
Run Code Online (Sandbox Code Playgroud)
请注意,整个composelamdba 可以提取到一个方法中,使代码使用方法引用并且更具可读性。
该解决方案分 4 个步骤完成,前两个步骤各有自己的AtomicInteger状态:
filter找出较小的元素。这会导致Flux<Integer>(单调)增加的数字,例如1 5 7 8 8。buffer由相同数量的块组成。我们使用orbufferUntil代替,因为最退化的情况是数字都不同,并且已经排序的结果会失败window*groupBytakeLast(1)跳过除一个 ( )之外的所有缓冲区flatMapIterable)这StepVerifier通过发出正确地通过了您的测试8 8。请注意,发出的中间缓冲区是:
onNext([1])
onNext([5])
onNext([7, 7, 7])
onNext([8, 8])
Run Code Online (Sandbox Code Playgroud)
bufferUntil一个更复杂的来源会失败,groupBy但不是这个解决方案:
Random rng = new Random();
//generate 258 numbers, each randomly repeated 1 to 10 times
//also, shuffle the whole thing
Flux<Integer> numbers = Flux
.range(1, 258)
.flatMap(i -> Mono.just(i).repeat(rng.nextInt(10)))
.collectList()
.map(l -> {
Collections.shuffle(l);
System.out.println(l);
return l;
})
.flatMapIterable(Function.identity())
.hide();
Run Code Online (Sandbox Code Playgroud)
这是它可以过滤到的缓冲区序列的一个示例(请记住仅重播最后一个缓冲区):
onNext([192])
onNext([245])
onNext([250])
onNext([256, 256])
onNext([257])
onNext([258, 258, 258, 258, 258, 258, 258, 258, 258])
onComplete()
Run Code Online (Sandbox Code Playgroud)
注意:如果删除该map随机播放,则会出现“退化情况”,甚至windowUntil无法正常工作(这takeLast将导致太多打开但未使用的窗口)。
这是一个有趣的想法!
| 归档时间: |
|
| 查看次数: |
7478 次 |
| 最近记录: |