小编Mur*_*kan的帖子

Spring Reactor 线程模型

Spring Webflux (v 2.0.1.RELEASE) 的新手警报。

我想将 Spring Webflux 用于后端(Webless)应用程序,以处理来自 JMS 侦听器的大量数据。

我的理解是 Spring Webflux 提供了一个]非阻塞/异步并发模型。但是,我遇到了一个需要帮助的基本问题。作为免责声明,响应式编程的整个概念对我来说是非常新的,我仍然处于这种范式转变的过程中。

考虑这个代码:

Mono.just("ONE")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

Mono.just("TWO")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

我从文档中了解到,在调用“订阅”函数之前,事件处理链不会发生任何事情。

但是在内部,spring 是否使用(如果愿意)为“map”函数内的每个函数异步使用单独的线程?如果 spring 为这些链使用“单一”线程,那么这里的真正目的是什么?它不是基于不同语法的阻塞和单线程模型吗?

我观察到代码总是按顺序运行并使用相同的线程。spring webflux的线程模型是什么?

java spring project-reactor spring-webflux

5
推荐指数
2
解决办法
7626
查看次数

Spring Security WebFlux - 带有身份验证的主体

我想实现简单的 Spring Security WebFlux 应用程序。
我想使用像这样的 JSON 消息

{
   'username': 'admin', 
   'password': 'adminPassword'
} 
Run Code Online (Sandbox Code Playgroud)

在正文中(对 /signin 的 POST 请求)以登录我的应用程序。

我做了什么?

我创建了这个配置

@Configuration
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity(proxyTargetClass = true)
public class WebFluxSecurityConfig {

    @Autowired
    private ReactiveUserDetailsService userDetailsService;

    @Autowired
    private ObjectMapper mapper;

    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder(11);
    }

    @Bean
    public ServerSecurityContextRepository securityContextRepository() {
        WebSessionServerSecurityContextRepository securityContextRepository =
                new WebSessionServerSecurityContextRepository();

        securityContextRepository.setSpringSecurityContextAttrName("securityContext");

        return securityContextRepository;
    }

    @Bean
    public ReactiveAuthenticationManager authenticationManager() {
        UserDetailsRepositoryReactiveAuthenticationManager authenticationManager =
                new UserDetailsRepositoryReactiveAuthenticationManager(userDetailsService);

        authenticationManager.setPasswordEncoder(passwordEncoder());

        return authenticationManager;
    }

    @Bean
    public AuthenticationWebFilter authenticationWebFilter() { …
Run Code Online (Sandbox Code Playgroud)

java spring-security reactor-netty spring-webflux

4
推荐指数
1
解决办法
1万
查看次数

Spring Weflux Rest API(带注释的控制器)的总处理时间

我们开始使用 Spring Webflux,我们正在为 REST API 使用带注释的控制器。我们想测量 Spring 引导服务器处理请求所需的总时间。看起来我们可以使用 Spring WebFilter但是我不确定如何设置 StartTime(ServerWebExchange或其他请求标头中的某种属性)?此外,一旦响应完成,我们如何获得 startTime 并计算时间差?

谢谢!

spring spring-boot spring-webflux

3
推荐指数
1
解决办法
1993
查看次数

在 WebFilter 类中抛出异常并在 ControllerAdvice 类中处理它

我正在尝试实现一个WebFilter检查 JWT 的方法,如果检查失败或结果无效,则抛出异常。我有一个@ControllerAdvice处理这些异常的类。但这不起作用。

这是WebFilter班级:

@Component
public class OktaAccessTokenFilter implements WebFilter {

private JwtVerifier jwtVerifier;

   @Autowired
   public OktaAccessTokenFilter(JwtVerifier jwtVerifier) {
        this.jwtVerifier = jwtVerifier;
   }

   @Override
   public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return Optional.ofNullable(exchange.getRequest().getHeaders().get("Authorization"))
            .flatMap(list -> list.stream().findFirst())
            .filter(authHeader -> !authHeader.isEmpty() && authHeader.startsWith("Bearer "))
            .map(authHeader -> authHeader.replaceFirst("^Bearer", ""))
            .map(jwtString -> {
                try {
                    jwtVerifier.decodeAccessToken(jwtString);
                } catch (JoseException e) {
                    throw new DecodeAccessTokenException();
                }
                return chain.filter(exchange);
            }).orElseThrow(() -> new AuthorizationException());
      }
}
Run Code Online (Sandbox Code Playgroud)

以及异常处理程序:

@ControllerAdvice
public class SecurityExceptionHandler …
Run Code Online (Sandbox Code Playgroud)

java spring-webflux

3
推荐指数
1
解决办法
5290
查看次数

无阻塞地将 Mono 转换为 Pojo

有没有办法将 Mono 对象转换为 java Pojo?我有一个 Web 客户端连接到 3rd 方 REST 服务,Mono我必须提取该对象并询问它,而不是返回。

我找到的所有示例都返回了,Mono<Pojo>但我必须获得 Pojo 本身。目前,我通过调用block()Pojo 来做到这一点,但有没有更好的方法来避免阻塞?

该块的问题在于,在几次运行后,它开始抛出一些错误,例如块因错误而终止。

 public MyPojo getPojo(){
     return myWebClient.get()
                .uri(generateUrl())
                .headers(createHttpHeaders(headersMap))
                .exchange()
                .flatMap(evaluateResponseStatus())
                .block();
}


private Function<ClientResponse, Mono<? extends MyPojo>> evaluateResponseStatus() {
      return response -> {
            if (response.statusCode() == HttpStatus.OK) {
                return response.bodyToMono(MyPojo.class);
            }
            if (webClientUtils.isError(response.statusCode())) {
                throw myHttpException(response);
                // This invokes my exceptionAdvice
                // but after few runs its ignored and 500 error is returned.
            }
            return Mono.empty();
        };
    }
Run Code Online (Sandbox Code Playgroud)

java web-client project-reactor spring-webflux

2
推荐指数
1
解决办法
1万
查看次数