Pat*_*Pat 5 java spring reactive-programming project-reactor spring-webflux
关于如何在方法中添加延迟但以非阻塞方式添加延迟的小问题。
模拟长流程的一种非常流行的方法是使用Thread.sleep();
但是,对于Reactor项目来说,这是一个阻塞操作。众所周知,在反应式项目中,我们不应该阻塞。
我想实验和模拟长过程。某种方法将花费大量时间,但以非阻塞方式,无需交换线程。这是为了模拟一个非常冗长的方法,但经过 BlockHound 等证明是非阻塞的。
这种结构非常流行:
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.map(a -> simulateLengthyProcessingOperation(a))
.subscribe(System.out::println);
}
public String simulateLengthyProcessingOperation(Integer input) {
simulateDelayBLOCKING();
return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
}
public void simulateDelayBLOCKING() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
但它正在阻塞。(我知道有,Mono.fromCallable(() ->但这不是问题)
是否可以做同样的事情,模拟延迟,但非阻塞?另外,.delay不会达到预期的结果(在同一反应管道上模拟非阻塞的冗长方法)
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.map(a -> simulateLengthyProcessingOperation(a))
.subscribe(System.out::println);
}
public String simulateLengthyProcessingOperation(Integer input) {
simulateDelay_NON_blocking();
return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
}
public void simulateDelay_NON_blocking() {
//simulate lengthy process, but WITHOUT blocking
}
Run Code Online (Sandbox Code Playgroud)
谢谢
当然可以,有一系列方法.delay...()
例如,您可以delayElements()在此处阅读有关方法的信息:
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#delayElements-java.time.Duration-
您应该知道它将执行线程切换到另一个Scheduler。信号被延迟并在并行默认调度程序上继续。
在最简单的情况下,它看起来像这样:
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.delayElements(Duration.ofMillis(1000L)) // delay each element for 1000 millis
.subscribe(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
根据您的情况,您可以这样编写代码:
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.concatMap(this::simulateDelay_NON_blocking)
.subscribe(System.out::println);
}
public Mono<String> simulateDelay_NON_blocking(Integer input) {
//simulate lengthy process, but WITHOUT blocking
return Mono.delay(Duration.ofMillis(1000L))
.map(__ -> String.format("[%d] on thread [%s] at time [%s]",
input, Thread.currentThread().getName(), new Date()));
}
Run Code Online (Sandbox Code Playgroud)