gst*_*low 2 java reactor reactive-programming spring-webflux
我读到有一个选项可以使用 Mono 进行阻塞调用。所以我尝试写几个代码片段:
A)
Mono.just("qwerty")
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.block()
Run Code Online (Sandbox Code Playgroud)
二)
Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = customMono
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic());
System.out.println("blockedMono.block(): " + blockedMono.block());
Run Code Online (Sandbox Code Playgroud)
C)
Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = Mono.just(0)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.then(customMono);
System.out.println("blockedMono.block(): " + blockedMono.block());
Run Code Online (Sandbox Code Playgroud)
导致同样的错误:
block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
Run Code Online (Sandbox Code Playgroud)
有办法解决吗?
PS我需要一个阻塞调用,并且我知道在反应代码中使用阻塞操作并不好
PS2
这可行,但我想避免转换为 Future
Mono.just("qwerty").toFuture().get()
Run Code Online (Sandbox Code Playgroud)
PS3
正如@dan1st 注意到的,行为取决于执行上下文。
如果我们将所有这些代码片段放入 main 方法中,它们都可以正常工作,没有任何异常
如果我们将代码放入其中,就会遇到主题中描述的行为
@GetMapping(..)
public void someEndpoint(...) {
// snippet is here
}
Run Code Online (Sandbox Code Playgroud)
所以这种行为在某种程度上取决于 Spring Web Flux
您能解释一下原因以及如何解决吗?
根据答案,我能够编写下面的代码块。因此,该行不会引发任何异常,但会返回null。
org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
Run Code Online (Sandbox Code Playgroud)
@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
ReactiveSecurityContextHolder.getContext()
.publishOn(Schedulers.boundedElastic()) //<-- this allows .block() somewhere downstream
.subscribeOn(Schedulers.boundedElastic())
.flatMap(securityContext -> Mono.just((org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal()));
Mono<org.springframework.security.oauth2.jwt.Jwt> secondMono = Mono.just(firstMono)
.publishOn(Schedulers.boundedElastic())
.map(it -> {
org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
System.out.println(Thread.currentThread() + "-" + jwt);
return jwt;
});
return secondMono;
}
Run Code Online (Sandbox Code Playgroud)
因此端点方法失败并出现错误:
java.lang.NullPointerException: The mapper [....my.Controller$$Lambda$2012/0x0000000800b68840] returned a null value
Run Code Online (Sandbox Code Playgroud)
但如果我写
@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
ReactiveSecurityContextHolder.getContext()
.map(securityContext ->(org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal());
return firstMono;
}
Run Code Online (Sandbox Code Playgroud)
一切正常,响应包含 JWT。看起来这是因为 spring webflux 上下文魔法。
这是我在另一篇文章中的回答的改写和澄清。
当你在reactor中阻塞时,意味着调用的线程block()被锁定,等待阻塞操作结束。
一个重要的事实是,被阻塞的线程是调用线程,而不是发布者的线程之一。它可能看起来像是一个显而易见的陈述,但我过去曾被这个错误所困扰,许多其他用户也曾被这个错误困扰过(例如,这是您问题的主题)。
由于块不是反应式管道的一部分,因此当您执行以下操作时:
public static void main(String[] args) {
Mono.just(1000)
.publishOn(Schedulers.single())
.flatMap(delay -> Mono.delay(Duration.ofMillis(delay)) // <1>
.block(); // <2>
}
Run Code Online (Sandbox Code Playgroud)
当你这样做时:
public static void main(String[] args) {
var publisher = Mono.just(1000)
.subscribeOn(Schedulers.single())
.flatMap(delay -> Mono.delay(Duration.ofMillis(delay));
Mono.just("whatever")
.publishOn(Schedulers.boundedElastic())
.map(it -> publisher.block()) // <1>
.block(); // <2>
}
Run Code Online (Sandbox Code Playgroud)
为了阻止其使用,Reactor 强制执行上下文验证。当block()调用时,它会检查调用线程是什么,如果它认为调用线程来自阻塞的不兼容调度程序,则会引发错误,以防止锁定线程。
大多数Reactor 和 Webflux 调度程序与不兼容block(),因为它们是为使用最少的资源实现高吞吐量而设计的。
因此,当您返回包含 webflux 中的块操作的发布者时,大多数情况下它将在不接受它的调度程序中执行,并且最终会出现您所描述的错误。
首先,也是最重要的一点,尽量避免阻塞。验证这是否可以避免。否则:
Scheduler blockingCompatibleScheduler = Schedulers.boundedElastic();
Mono<T> toBlock = Mono...
Mono<T> wrapper = Mono.fromCallable(() -> toBlock.block())
.subscribeOn(blockingCompatibleScheduler);
// or
Mono<T> wrapper = Mono.just("any")
.publishOn(blockingCompatibleScheduler)
.map(it -> toBlock.block());
Run Code Online (Sandbox Code Playgroud)
更多详细信息请参阅官方文档的专门部分到目前为止,我指的是执行上下文。但还有第二种: 国家背景。
从 Reactor 3.1 开始,提供了Context / ContextView API 来跨链式订阅(从下游到上游)共享上下文信息。
官方文档包含关于此机制的专门部分,以提供深入的解释(这相当复杂)。
由于其性质,block操作员会阻止该机制发挥作用。当上下文将信息从下游订阅传播到内部/上游订阅时,它无法向block操作员提供上下文信息:它使用隐藏订阅,与父/下游订阅/管道断开连接。换句话说,flux/mono 无法访问内部被阻止的发布者,因此无法在其中传播上下文。
我们可以用一个简化的例子来测试它:
import reactor.core.publisher.Mono;
public class BlockBreaksContext {
static final Object CTX_KEY = new Object();
/**
* Add "Hello" message to the provided action context, then run/block it and print
* output value.
*/
static void execute(Mono<String> action) {
String value = Mono.from(action)
.contextWrite(ctx -> ctx.put(CTX_KEY, "Hello"))
.block();
System.out.println(value);
}
public static void main(String[] args) {
Mono<String> getContextValue = Mono.deferContextual(ctx
-> Mono.just(ctx.getOrDefault(CTX_KEY, "No value from context")));
// Without blocking, the mono will receive the context written by downstream subscription
execute(getContextValue.map(it -> "NO BLOCKING: " + it));
// With blocking, the Mono is **not** part of the "main" pipeline/subscription.
// Therefore, it cannot receive context on execution, because block cause an
// independent/isolated subscription, whose main chain of event is unaware of.
Mono wrapBlock = Mono.fromCallable(() -> getContextValue.block());
execute(wrapBlock.map(it -> "BLOCKING: " + it));
}
}
Run Code Online (Sandbox Code Playgroud)
该程序打印:
NO BLOCKING: Hello
BLOCKING: No value from context
Run Code Online (Sandbox Code Playgroud)
在您的问题中,您尝试访问安全令牌。由于令牌是根据客户端请求解析的,Webflux 将其放入响应发布者上下文中。当您阻止时,您将取消被阻止的发布者与响应发布者的关联。
| 归档时间: |
|
| 查看次数: |
9091 次 |
| 最近记录: |