Spring Webflux (v 2.0.1.RELEASE) 的新手警报。
我想将 Spring Webflux 用于后端(Webless)应用程序,以处理来自 JMS 侦听器的大量数据。
我的理解是 Spring Webflux 提供了一个]非阻塞/异步并发模型。但是,我遇到了一个需要帮助的基本问题。作为免责声明,响应式编程的整个概念对我来说是非常新的,我仍然处于这种范式转变的过程中。
考虑这个代码:
Mono.just("ONE")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);
Mono.just("TWO")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
我从文档中了解到,在调用“订阅”函数之前,事件处理链不会发生任何事情。
但是在内部,spring 是否使用(如果愿意)为“map”函数内的每个函数异步使用单独的线程?如果 spring 为这些链使用“单一”线程,那么这里的真正目的是什么?它不是基于不同语法的阻塞和单线程模型吗?
我观察到代码总是按顺序运行并使用相同的线程。spring webflux的线程模型是什么?
我想实现简单的 Spring Security WebFlux 应用程序。
我想使用像这样的 JSON 消息
{
'username': 'admin',
'password': 'adminPassword'
}
Run Code Online (Sandbox Code Playgroud)
在正文中(对 /signin 的 POST 请求)以登录我的应用程序。
我做了什么?
我创建了这个配置
@Configuration
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity(proxyTargetClass = true)
public class WebFluxSecurityConfig {
@Autowired
private ReactiveUserDetailsService userDetailsService;
@Autowired
private ObjectMapper mapper;
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder(11);
}
@Bean
public ServerSecurityContextRepository securityContextRepository() {
WebSessionServerSecurityContextRepository securityContextRepository =
new WebSessionServerSecurityContextRepository();
securityContextRepository.setSpringSecurityContextAttrName("securityContext");
return securityContextRepository;
}
@Bean
public ReactiveAuthenticationManager authenticationManager() {
UserDetailsRepositoryReactiveAuthenticationManager authenticationManager =
new UserDetailsRepositoryReactiveAuthenticationManager(userDetailsService);
authenticationManager.setPasswordEncoder(passwordEncoder());
return authenticationManager;
}
@Bean
public AuthenticationWebFilter authenticationWebFilter() { …Run Code Online (Sandbox Code Playgroud) 我们开始使用 Spring Webflux,我们正在为 REST API 使用带注释的控制器。我们想测量 Spring 引导服务器处理请求所需的总时间。看起来我们可以使用 Spring WebFilter但是我不确定如何设置 StartTime(ServerWebExchange或其他请求标头中的某种属性)?此外,一旦响应完成,我们如何获得 startTime 并计算时间差?
谢谢!
我正在尝试实现一个WebFilter检查 JWT 的方法,如果检查失败或结果无效,则抛出异常。我有一个@ControllerAdvice处理这些异常的类。但这不起作用。
WebFilter班级:@Component
public class OktaAccessTokenFilter implements WebFilter {
private JwtVerifier jwtVerifier;
@Autowired
public OktaAccessTokenFilter(JwtVerifier jwtVerifier) {
this.jwtVerifier = jwtVerifier;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return Optional.ofNullable(exchange.getRequest().getHeaders().get("Authorization"))
.flatMap(list -> list.stream().findFirst())
.filter(authHeader -> !authHeader.isEmpty() && authHeader.startsWith("Bearer "))
.map(authHeader -> authHeader.replaceFirst("^Bearer", ""))
.map(jwtString -> {
try {
jwtVerifier.decodeAccessToken(jwtString);
} catch (JoseException e) {
throw new DecodeAccessTokenException();
}
return chain.filter(exchange);
}).orElseThrow(() -> new AuthorizationException());
}
}
Run Code Online (Sandbox Code Playgroud)
@ControllerAdvice
public class SecurityExceptionHandler …Run Code Online (Sandbox Code Playgroud) 有没有办法将 Mono 对象转换为 java Pojo?我有一个 Web 客户端连接到 3rd 方 REST 服务,Mono我必须提取该对象并询问它,而不是返回。
我找到的所有示例都返回了,Mono<Pojo>但我必须获得 Pojo 本身。目前,我通过调用block()Pojo 来做到这一点,但有没有更好的方法来避免阻塞?
该块的问题在于,在几次运行后,它开始抛出一些错误,例如块因错误而终止。
public MyPojo getPojo(){
return myWebClient.get()
.uri(generateUrl())
.headers(createHttpHeaders(headersMap))
.exchange()
.flatMap(evaluateResponseStatus())
.block();
}
private Function<ClientResponse, Mono<? extends MyPojo>> evaluateResponseStatus() {
return response -> {
if (response.statusCode() == HttpStatus.OK) {
return response.bodyToMono(MyPojo.class);
}
if (webClientUtils.isError(response.statusCode())) {
throw myHttpException(response);
// This invokes my exceptionAdvice
// but after few runs its ignored and 500 error is returned.
}
return Mono.empty();
};
}
Run Code Online (Sandbox Code Playgroud)