block()/blockFirst()/blockLast() 是阻塞的,线程并行不支持

gst*_*low 2 java reactor reactive-programming spring-webflux

读到有一个选项可以使用 Mono 进行阻塞调用。所以我尝试写几个代码片段:

A)

Mono.just("qwerty")
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.block()
Run Code Online (Sandbox Code Playgroud)

二)

Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = customMono
        .subscribeOn(Schedulers.boundedElastic())
        .publishOn(Schedulers.boundedElastic());
System.out.println("blockedMono.block(): " + blockedMono.block());
Run Code Online (Sandbox Code Playgroud)

C)

 Mono<String> customMono = Mono.just("qwerty");
 Mono<String> blockedMono =  Mono.just(0)
            .subscribeOn(Schedulers.boundedElastic())
            .publishOn(Schedulers.boundedElastic())
            .then(customMono);    
 System.out.println("blockedMono.block(): " + blockedMono.block());
Run Code Online (Sandbox Code Playgroud)

导致同样的错误:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
Run Code Online (Sandbox Code Playgroud)

有办法解决吗?

PS我需要一个阻塞调用,并且我知道在反应代码中使用阻塞操作并不好

PS2

这可行,但我想避免转换为 Future

Mono.just("qwerty").toFuture().get()
Run Code Online (Sandbox Code Playgroud)

PS3

正如@dan1st 注意到的,行为取决于执行上下文。

  1. 如果我们将所有这些代码片段放入 main 方法中,它们都可以正常工作,没有任何异常

  2. 如果我们将代码放入其中,就会遇到主题中描述的行为

    @GetMapping(..)
    public void someEndpoint(...) {
       // snippet is here
    }
    
    Run Code Online (Sandbox Code Playgroud)

所以这种行为在某种程度上取决于 Spring Web Flux

您能解释一下原因以及如何解决吗?

更新

根据答案,我能够编写下面的代码块。因此,该行不会引发任何异常,但会返回null

org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
Run Code Online (Sandbox Code Playgroud)
@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
    Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
            ReactiveSecurityContextHolder.getContext()
                    .publishOn(Schedulers.boundedElastic()) //<-- this allows .block() somewhere downstream
                    .subscribeOn(Schedulers.boundedElastic())
                    .flatMap(securityContext -> Mono.just((org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal()));
    Mono<org.springframework.security.oauth2.jwt.Jwt> secondMono = Mono.just(firstMono)
            .publishOn(Schedulers.boundedElastic())
            .map(it -> {
                org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
                System.out.println(Thread.currentThread() + "-" + jwt);
                return jwt;
            });
    return secondMono;
}
Run Code Online (Sandbox Code Playgroud)

因此端点方法失败并出现错误:

java.lang.NullPointerException: The mapper [....my.Controller$$Lambda$2012/0x0000000800b68840] returned a null value
Run Code Online (Sandbox Code Playgroud)

但如果我写

@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
    Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
            ReactiveSecurityContextHolder.getContext()
                    .map(securityContext ->(org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal());        
    return firstMono;
}
Run Code Online (Sandbox Code Playgroud)

一切正常,响应包含 JWT。看起来这是因为 spring webflux 上下文魔法。

ama*_*nin 6

免责声明

这是我在另一篇文章中的回答的改写和澄清。

反应堆堵塞

当你在reactor中阻塞时,意味着调用的线程block()被锁定,等待阻塞操作结束。

一个重要的事实是,被阻塞的线程是调用线程,而不是发布者的线程之一。它可能看起来像是一个显而易见的陈述,但我过去曾被这个错误所困扰,许多其他用户也曾被这个错误困扰过(例如,这是您问题的主题)。

由于不是反应式管道的一部分,因此当您执行以下操作时

public static void main(String[] args) {
   Mono.just(1000)
       .publishOn(Schedulers.single())
       .flatMap(delay -> Mono.delay(Duration.ofMillis(delay)) // <1>
       .block(); // <2>
}
Run Code Online (Sandbox Code Playgroud)
  1. flatMap由单个调度程序执行
  2. block 由程序主线程执行。

当你这样做时:

public static void main(String[] args) {
    var publisher = Mono.just(1000)
        .subscribeOn(Schedulers.single())
        .flatMap(delay -> Mono.delay(Duration.ofMillis(delay));
   
    Mono.just("whatever")
        .publishOn(Schedulers.boundedElastic())
        .map(it -> publisher.block()) // <1>
        .block(); // <2>
}
Run Code Online (Sandbox Code Playgroud)
  1. 第一个块在有界弹性调度程序的线程上执行。
  2. 第二个块从程序主线程执行。

禁止封锁时

为了阻止其使用,Reactor 强制执行上下文验证。当block()调用时,它会检查调用线程是什么,如果它认为调用线程来自阻塞的不兼容调度程序,则会引发错误,以防止锁定线程。

大多数Reactor 和 Webflux 调度程序与不兼容block(),因为它们是为使用最少的资源实现高吞吐量而设计的。

因此,当您返回包含 webflux 中的块操作的发布者时,大多数情况下它将在不接受它的调度程序中执行,并且最终会出现您所描述的错误。

如何在反应式管道中间阻塞

首先,也是最重要的一点,尽量避免阻塞。验证这是否可以避免。否则:

  1. 选择阻塞兼容的调度程序:
  2. 将您想要阻止的发布者与另一个发布者 包装在一起,该发布者将在您的调度程序上发布/安排其操作:
    Scheduler blockingCompatibleScheduler = Schedulers.boundedElastic();
    Mono<T> toBlock = Mono...
    Mono<T> wrapper = Mono.fromCallable(() -> toBlock.block())
                          .subscribeOn(blockingCompatibleScheduler);
    // or
    Mono<T> wrapper = Mono.just("any")
                          .publishOn(blockingCompatibleScheduler)
                          .map(it -> toBlock.block());
    
    Run Code Online (Sandbox Code Playgroud) 更多详细信息请参阅官方文档的专门部分

块破坏了上下文

到目前为止,我指的是执行上下文。但还有第二种: 国家背景。

从 Reactor 3.1 开始,提供了Context / ContextView API 来跨链式订阅(从下游到上游)共享上下文信息。

官方文档包含关于此机制的专门部分,以提供深入的解释(这相当复杂)。

由于其性质,block操作员会阻止该机制发挥作用。当上下文将信息从下游订阅传播到内部/上游订阅时,它无法向block操作员提供上下文信息:它使用隐藏订阅,与父/下游订阅/管道断开连接。换句话说,flux/mono 无法访问内部被阻止的发布者,因此无法在其中传播上下文。

我们可以用一个简化的例子来测试它:

import reactor.core.publisher.Mono;

public class BlockBreaksContext {

    static final Object CTX_KEY = new Object();

    /**
     * Add "Hello" message to the provided action context, then run/block it and print
     * output value.
     */
    static void execute(Mono<String> action) {
        String value = Mono.from(action)
                           .contextWrite(ctx -> ctx.put(CTX_KEY, "Hello"))
                           .block();
        System.out.println(value);
    }

    public static void main(String[] args) {
        Mono<String> getContextValue = Mono.deferContextual(ctx
                -> Mono.just(ctx.getOrDefault(CTX_KEY, "No value from context")));

        // Without blocking, the mono will receive the context written by downstream subscription
        execute(getContextValue.map(it -> "NO BLOCKING: " + it));

        // With blocking, the Mono is **not** part of the "main" pipeline/subscription.
        // Therefore, it cannot receive context on execution, because block cause an
        // independent/isolated subscription, whose main chain of event is unaware of.
        Mono wrapBlock = Mono.fromCallable(() -> getContextValue.block());
        execute(wrapBlock.map(it -> "BLOCKING: " + it));
    }
}
Run Code Online (Sandbox Code Playgroud)

该程序打印:

NO BLOCKING: Hello
BLOCKING: No value from context
Run Code Online (Sandbox Code Playgroud)

在您的问题中,您尝试访问安全令牌。由于令牌是根据客户端请求解析的,Webflux 将其放入响应发布者上下文中。当您阻止时,您将取消被阻止的发布者与响应发布者的关联。