如何在另一个 Mono 终止后触发 Mono 执行

Kog*_*uro 2 java reactive-programming project-reactor

当我尝试执行 Mono insidedoFinally子句时遇到问题。这是我的代码。

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> stage.get().doFinally(ignored -> doUnlock(lock)));
}
Run Code Online (Sandbox Code Playgroud)

问题是doUnlock(lock)insidedoFinally()返回一个没有人订阅的单声道,因为doFinally它没有链接。所以异步代码部分doUnlock从未真正执行过。

有没有办法避免这种使用MonoFlux助手?

Ole*_*uka 6

Mono#then.

不幸的是,您无法避免使用 Mono/Flux,一旦您的 API 构建在它之上,但是,您可以通过以下方式解决该问题。

要将多个独立的执行链接起来,这些执行应该一个接一个地订阅,并且第一个的结果将在第一个完成后返回,有一个Mono#then运算符允许编写以下(类似承诺的)代码:

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> 
                    stage.get()
                         .flatMap(value -> 
                            doUnlock(lock)
                            .then(Mono.just(value))
                         )
                );
    }
}
Run Code Online (Sandbox Code Playgroud)

在这里,为了链式执行然后释放锁然后返回暂存值,我们使用flatMap映射值作为释放锁并then再次返回暂存值。(承认,听起来很尴尬)

注意,在错误终端信号的情况下,then将被忽略。因此,为了实现try-finally行为,可能需要提供额外的orErrorResume运算符,如以下示例所示:

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> 
                    stage.get()
                         .flatMap(value -> 
                            doUnlock(lock)
                            .then(Mono.just(value))
                         )
                         .onErrorResume(t -> 
                            doUnlock(lock)
                            .then(Mono.error(t))
                         )
                );
    }
}
Run Code Online (Sandbox Code Playgroud)