Spring Reactor:以非阻塞方式添加延迟

Pat*_*Pat 5 java spring reactive-programming project-reactor spring-webflux

关于如何在方法中添加延迟但以非阻塞方式添加延迟的小问题。

模拟长流程的一种非常流行的方法是使用Thread.sleep(); 但是,对于Reactor项目来说,这是一个阻塞操作。众所周知,在反应式项目中,我们不应该阻塞。

我想实验和模拟长过程。某种方法将花费大量时间,但以非阻塞方式,无需交换线程。这是为了模拟一个非常冗长的方法,但经过 BlockHound 等证明是非阻塞的。

这种结构非常流行:

@Test
    public void simulateLengthyProcessingOperationReactor() {
        Flux.range(1,5000)
                .map(a -> simulateLengthyProcessingOperation(a))
                .subscribe(System.out::println);
    }

    public String simulateLengthyProcessingOperation(Integer input) {
        simulateDelayBLOCKING();
        return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
    }

    public void simulateDelayBLOCKING() {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
Run Code Online (Sandbox Code Playgroud)

但它正在阻塞。(我知道有,Mono.fromCallable(() ->但这不是问题)

是否可以做同样的事情,模拟延迟,但非阻塞?另外,.delay不会达到预期的结果(在同一反应管道上模拟非阻塞的冗长方法)

@Test
    public void simulateLengthyProcessingOperationReactor() {
        Flux.range(1,5000)
                .map(a -> simulateLengthyProcessingOperation(a))
                .subscribe(System.out::println);
    }

    public String simulateLengthyProcessingOperation(Integer input) {
        simulateDelay_NON_blocking();
        return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
    }

    public void simulateDelay_NON_blocking() {
        //simulate lengthy process, but WITHOUT blocking
    }
Run Code Online (Sandbox Code Playgroud)

谢谢

ker*_*ter 6

当然可以,有一系列方法.delay...()

例如,您可以delayElements()在此处阅读有关方法的信息: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#delayElements-java.time.Duration-

您应该知道它将执行线程切换到另一个Scheduler。信号被延迟并在并行默认调度程序上继续。

在最简单的情况下,它看起来像这样:

public void simulateLengthyProcessingOperationReactor() {
    Flux.range(1,5000)
            .delayElements(Duration.ofMillis(1000L)) // delay each element for 1000 millis
            .subscribe(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)

根据您的情况,您可以这样编写代码:

@Test
public void simulateLengthyProcessingOperationReactor() {
    Flux.range(1,5000)
            .concatMap(this::simulateDelay_NON_blocking)
            .subscribe(System.out::println);
}

public Mono<String> simulateDelay_NON_blocking(Integer input) {
    //simulate lengthy process, but WITHOUT blocking
    return Mono.delay(Duration.ofMillis(1000L))
            .map(__ -> String.format("[%d] on thread [%s] at time [%s]",
                    input, Thread.currentThread().getName(), new Date()));
}
Run Code Online (Sandbox Code Playgroud)