Java 19 中引入了虚拟线程JEP-425作为预览功能。
在对 Java 虚拟线程(Project Loom)的概念(有时称为轻量级线程(有时称为纤维或绿色线程))进行一些研究之后,我对它们与反应式库的潜在使用非常感兴趣,例如基于 Spring WebFlux在 Project Reactor(反应流实现)和 Netty 上,用于有效地进行阻塞调用。
如今,大多数 JVM 实现都将 Java 线程实现为操作系统线程的瘦直接包装器,有时称为重量级、操作系统管理的线程平台线程。
虽然平台线程一次只能执行一个线程,但是当当前执行的虚拟线程进行阻塞调用(例如网络、文件系统、数据库调用)时,虚拟线程能够切换到执行不同的虚拟线程。
因此,在 Reactor 中处理阻塞调用时,我们使用以下构造:
Mono.fromCallable(() -> {
return blockingOperation();
}).subscribeOn(Schedulers.boundedElastic());
Run Code Online (Sandbox Code Playgroud)
我们subcribeOn()提供了一个Scheduler创建专用线程来执行该阻塞操作的方法。然而,这意味着线程最终将被阻塞,因此,由于我们仍然使用老式的线程模型,我们实际上会阻塞平台线程,这仍然不是处理 CPU 资源的真正有效的方式。
所以,问题是,我们是否可以直接使用具有反应式框架的虚拟线程来进行这样的阻塞调用,例如使用Executors.newVirtualThreadPerTaskExecutor():
创建一个执行器,为每个任务启动一个新的虚拟线程。Executor创建的线程数量是无限制的。
Mono.fromCallable(() -> {
return …Run Code Online (Sandbox Code Playgroud) java spring-boot project-reactor spring-webflux project-loom
我对 Reactive Stream 有点陌生,所以在使用Spring Webflux和Reactor时遇到了一个问题。
我制作了一个如下所示的片段:
@RestController
public class TestController {
@GetMapping("responsebody/flux")
public Flux<String> tt2() {
return Flux.range(1, 5)
.delayElements(Duration.ofMillis(1000))
.map(l -> "hi");
}
}
Run Code Online (Sandbox Code Playgroud)
而且,有趣的是,镶边分别显示序列中的每个元素,而不是当我仅使用浏览器请求时一次公开所有元素。(但开发工具立即显示全身)
但我想知道,即使 HTTP 1 只使用一个连接,并且服务器发送的数据放在 HTTP 协议的正文中,它是如何工作的。客户端如何知道哪个元素分隔每个元素以及序列何时完成?如果客户端还没有准备好使用反应流怎么办?
我不需要任何使用反应式库的代码,但想知道协议是如何工作的。
http project-reactor reactive-streams reactive spring-webflux
关于如何在方法中添加延迟但以非阻塞方式添加延迟的小问题。
模拟长流程的一种非常流行的方法是使用Thread.sleep();
但是,对于Reactor项目来说,这是一个阻塞操作。众所周知,在反应式项目中,我们不应该阻塞。
我想实验和模拟长过程。某种方法将花费大量时间,但以非阻塞方式,无需交换线程。这是为了模拟一个非常冗长的方法,但经过 BlockHound 等证明是非阻塞的。
这种结构非常流行:
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.map(a -> simulateLengthyProcessingOperation(a))
.subscribe(System.out::println);
}
public String simulateLengthyProcessingOperation(Integer input) {
simulateDelayBLOCKING();
return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
}
public void simulateDelayBLOCKING() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
但它正在阻塞。(我知道有,Mono.fromCallable(() ->但这不是问题)
是否可以做同样的事情,模拟延迟,但非阻塞?另外,.delay不会达到预期的结果(在同一反应管道上模拟非阻塞的冗长方法)
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.map(a -> simulateLengthyProcessingOperation(a))
.subscribe(System.out::println);
}
public String simulateLengthyProcessingOperation(Integer input) {
simulateDelay_NON_blocking();
return String.format("[%d] on …Run Code Online (Sandbox Code Playgroud) java spring reactive-programming project-reactor spring-webflux
我有一个休息服务,我的请求主体 bean 注释有 javax.validation 等,并且在一个特定字段中我收到一个编码为字符串 base64 的@NotBlank @NotNull @Pattern文件,
那么,是否有注释,或者我如何编写自定义验证注释,以便它检查字符串是否确实是 Base64 字符串?
我只需要以注释形式进行这样的验证:
try {
Base64.getDecoder().decode(someString);
return true;
} catch(IllegalArgumentException iae) {
return false;
}
Run Code Online (Sandbox Code Playgroud)
提前谢谢