标签: spring-reactive

spring-boot ReactiveClientRegistrationRepository 未找到

环境:Spring Boot 2.3.1、Java 11

我已经尝试了一些东西(也与 spring 的示例应用程序进行比较),但到目前为止,我未能成功创建WebClient需要ReactiveClientRegistrationRepository.

启动我的 spring-boot 应用程序时出现以下异常:

required a bean of type 'org.springframework.security.oauth2.client.registration.ReactiveClientRegistrationRepository'
Run Code Online (Sandbox Code Playgroud)

我理解 spring-boot-autoconfigure 的方式应该使用ReactiveOAuth2ClientAutoConfiguration,因为在 yml 文件中给出了所需的属性。

根据一些代码片段,如果缺少某些内容,我可以提供更多信息以获取整个上下文

主级

@Slf4j
@SpringBootApplication
@EnableConfigurationProperties(MyAppConfigurationProperties.class)
public class MyApp{

    public static void main(final String[] args) {
        SpringApplication.run(MyApp.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

配置:

@Configuration
//@Import(ReactiveOAuth2ClientAutoConfiguration.class) // in the test it works with this, but should not be required: spring-boot-autoconfigure
public class MyRestClientConfig {

  @Bean
    WebClient myWebClient(WebClient.Builder builder, ReactiveClientRegistrationRepository clientRegistrations) {
//content not relevant to this …
Run Code Online (Sandbox Code Playgroud)

java spring-boot spring-webclient spring-reactive

11
推荐指数
1
解决办法
2万
查看次数

如何摆脱 conversionServicePostProcessor bean 冲突?

我有一个具有以下依赖项的项目:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-hateoas</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-config</artifactId>
    </dependency>

    <!-- Streaming/Kafka -->
    <dependency>
        <groupId>io.projectreactor.kafka</groupId>
        <artifactId>reactor-kafka</artifactId>
        <version>1.1.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.0.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- JPA -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
        <exclusions>
            <exclusion>
                <!--
                HikariCP is excluded so tomcat connection is used.
                This is necessary to set "connection-properties: v$session.program" param …
Run Code Online (Sandbox Code Playgroud)

spring spring-security spring-boot spring-reactive

7
推荐指数
0
解决办法
2961
查看次数

Webflux 请求/响应日志记录

我相信这是记录请求和响应的标准,以便跟踪用户报告的任何问题并将其与 Splunk 等工具集成。

到目前为止,我已经找到了两种方法,但没有一种方法能够真正正确地解决这个问题:

  1. /sf/answers/4495259891/
  2. /sf/answers/3355205801/

有人甚至写道它是针对反应式堆栈的: https: //stackoverflow.com/a/45280764/4950459

所以我有疑问:

  1. Spring团队没有推荐的方法吗?
  2. 如果 webflux 中的日志记录是针对反应式堆栈的,那么我们应该如何实际记录请求/响应?
  3. 如果 Webflux 不是记录请求/响应的合适位置,那么我们应该从哪里获取这些信息?Nginx 可能会使用请求 ID 来记录它 - 但这是解决这个问题的正确方法吗?

logging reactive-programming spring-boot spring-webflux spring-reactive

7
推荐指数
0
解决办法
3806
查看次数

Spring Reactive 中自定义 ErrorAttributeOptions 的问题

在我的处理函数中我有这个方法

public Mono<ServerResponse> itemsEx(ServerRequest serverRequest) {
    throw new RuntimeException("RuntimeException Occured");
}
Run Code Online (Sandbox Code Playgroud)

现在,我想处理这个异常,所以我重写AbstractErrorWebExceptionHandler并创建这个类。

package com.learnreactivespring.learnreactivespring.exception;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.web.ResourceProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;

import java.util.Map;

@Component
@Slf4j
public class FunctionalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {


public FunctionalErrorWebExceptionHandler(ErrorAttributes errorAttributes,
                                          ApplicationContext applicationContext,
                                          ServerCodecConfigurer serverCodecConfigurer) {
    super(errorAttributes, new ResourceProperties(), applicationContext);
    super.setMessageWriters(serverCodecConfigurer.getWriters());
    super.setMessageReaders(serverCodecConfigurer.getReaders());
}

@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
    return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);

}

private  Mono<ServerResponse> renderErrorResponse(ServerRequest …
Run Code Online (Sandbox Code Playgroud)

java spring spring-reactive

6
推荐指数
1
解决办法
4442
查看次数

如何在非反应式 Spring EventListener 和反应式 Flux 之间架起桥梁

通过调用Flux.push和使用pushlambda 表达式中的接收器直接创建 Flux与使用由 a 提供的接收器有DirectProcessor什么区别?

在 Flux 只发出几个事件的最小示例中,我可以做

Flux.<String>push(emitter -> {
   emitter.next("One");
   emitter.next("Two");
   emitter.complete();
 });
Run Code Online (Sandbox Code Playgroud)

与使用 DirectProcessor

var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();
Run Code Online (Sandbox Code Playgroud)

澄清一下:我知道我可以Flux.just在这里使用,但我的用例实际上是在 Spring 的@EventListeners 和 Spring WebFlux之间建立一座桥梁,我想为每个传入的特定资源的 SSE 请求创建一个 Flux,然后将事件发布到这个通量。

有人能告诉我,这两种方法是否都有效?当然,肯定有一些不同。特别是,Reactor Reference Guide部分关于DirectProcessor状态:

另一方面,它具有不处理背压的限制。因此,如果您通过 DirectProcessor 推送 N 个元素但至少其中一个订阅者的请求少于 N,则 DirectProcessor 会向其订阅者发出 IllegalStateException 信号。

这意味着什么?

[编辑:]在我使用的问题的早期版本中,Flux.generate()而不是Flux.push(),这显然是错误的,因为 generate 最多可以创建一个事件。

[编辑 2:] @123 向我询问了我想要实现的目标的完整示例。忍受我,这是一个 SO 问题的相当数量的代码:

我实际尝试做的完整示例

我想在(非反应式)Spring 域事件侦听器和反应式 Flux 之间建立一座桥梁,然后我可以在 …

spring spring-webflux spring-reactive spring-reactor

5
推荐指数
2
解决办法
1861
查看次数

Spring Cloud网关在过滤器中发送响应

我使用 Spring Cloud Gateway 作为边缘服务器。这就是流程

如果请求具有名为“x-foo”的标头,则查找标头值,从另一台服务器获取字符串并将该字符串作为响应发送,而不是实际代理请求。

这是 Filter DSL 的代码

@Bean
    public RouteLocator routes(RouteLocatorBuilder builder) {
        return builder.routes()
                .route("foo-filter", r -> r.header('x-foo').and().header("x-intercepted").negate()
                        .filters(f -> f.filter(fooFilter))
                        .uri("http://localhost:8081")) // 8081 is self port, there are other proxy related configurations too
                .build();
    }
Run Code Online (Sandbox Code Playgroud)

Foo 过滤器的代码

@Component
@Slf4j
public class FooFilter implements GatewayFilter {

    @Autowired
    private ReactiveRedisOperations<String, String> redisOps;

    @Value("${header-name}")
    private String headerName;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        var foo = request.getHeaders().getFirst(headerName);

        return redisOps.opsForHash()
                .get("foo:" + foo, …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-cloud-gateway spring-reactive

5
推荐指数
1
解决办法
9009
查看次数

在 Spring WebFlux 中进行异步 SOAP 调用

我有一个使用 WebFlux 和 REST API 的反应式 Spring 应用程序。每当用户调用我的 API 时,我都需要调用公开 WSDL 的 SOAP 服务,执行一些操作并返回结果。

如何将对 SOAP 服务的调用与 Reactive WebFlux 框架结合起来?

在我看来,我可以通过两种不同的方式来做到这一点:

  1. 使用 WebFlux 的 WebClient 构造并发送 SOAP 消息。
  2. 使用 Mono / Flux 中的 WebServiceGatewaySupport 包装同步调用。

第一种方法是我的偏好,但我不知道该怎么做。

这里也提出了类似的问题: Reactive Spring WebClient - Making a SOAP call,它引用了这篇博客文章(https://blog.godatadriven.com/jaxws-reactive-client)。但我无法让这个例子发挥作用。

在 Gradle 插件中使用,wsdl2java我可以使用异步方法创建客户端界面,但我不明白如何使用它。使用时WebServiceGatewaySupport,我根本不使用生成的接口或其方法。相反,我调用通用marshalSendAndReceive方法

public class MySoapClient extends WebServiceGatewaySupport {

    public QueryResponse execute() {
        Query query = new ObjectFactory().createQuery();
        // Further create and set the domain object …
Run Code Online (Sandbox Code Playgroud)

java soap spring-ws spring-webflux spring-reactive

5
推荐指数
1
解决办法
9321
查看次数

Spring Reactive WebFlux - 如何自定义BadRequest错误消息

在我的请求处理程序中,如果传入的内容accountId无法转换为有效的,ObjectId我想捕获错误并发回有意义的消息;但是,这样做会导致返回类型不兼容,并且我无法弄清楚如何实现这个非常简单的用例。

我的代码:

  @GetMapping("/{accountId}")
  public Mono<ResponseEntity<Account>> get(@PathVariable String accountId) {
      log.debug(GETTING_DATA_FOR_ACCOUNT, accountId);

      try {
        ObjectId id = new ObjectId(accountId);
        return repository.findById(id)
            .map(ResponseEntity::ok)
            .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
      } catch (IllegalArgumentException ex) {
        log.error(MALFORMED_OBJECT_ID, accountId);
        // TODO(marco): find a way to return the custom error message. This seems to be currently
        //  impossible with the Reactive API, as using body(message) changes the return type to
        //  be incompatible (and Mono<ResponseEntity<?>> does not seem to cut it).
        return Mono.just(ResponseEntity.badRequest().build());
      }
  }
Run Code Online (Sandbox Code Playgroud)

body(T …

java spring-webflux spring-reactive

5
推荐指数
1
解决办法
6562
查看次数

map 和 doOnNext 有什么区别?(即项目反应堆)

在 Flux 中,地图函数也为通量中的每个项目执行。对于通量中的每个项目(发射的),doOnNext 函数也被执行。从用户角度看有什么区别?为什么存在两种类似的方法?可以用简单的易用性来解释。

reactive-programming reactive spring-webflux spring-reactive

4
推荐指数
2
解决办法
4278
查看次数

Spring Security React - 如何调试“无效凭据”错误?

我想在我的反应式 Spring Boot 应用程序中使用外部 OAuth 提供程序对用户进行身份验证。

按照官方教程,我成功地使用预配置的提供程序(Google、Github 等)实现了该流程。可以使用这些属性将配置更改为未预先配置的提供程序,例如:

spring.security.oauth2.client.registration.<providerName>.client-id=<clientId>
spring.security.oauth2.client.registration.<providerName>.client-secret=<clientSecret>
spring.security.oauth2.client.registration.<providerName>.redirect-uri={baseUrl}/login/oauth2/code/<providerName>
spring.security.oauth2.client.registration.<providerName>.provider=<providerName>
spring.security.oauth2.client.registration.<providerName>.client-authentication-method=basic
spring.security.oauth2.client.registration.<providerName>.authorization-grant-type=authorization_code

spring.security.oauth2.client.provider.<providerName>.authorization-uri=https://api.<providerName>.com/authorize
spring.security.oauth2.client.provider.<providerName>.token-uri=https://api.<providerName>.com/token
Run Code Online (Sandbox Code Playgroud)

通过此设置,将提示用户登录页面,并使用以下命令调用指定的重定向 url authCode

调用的重定向网址

但是,返回此错误页面,控制台中没有日志条目或异常(即使我设置了该logging.level.org.springframework.security=DEBUG属性)。

无效凭据错误

可能是什么问题?我可以从哪里开始调试这个?

spring spring-security spring-boot spring-security-oauth2 spring-reactive

4
推荐指数
1
解决办法
4720
查看次数

您可以 Flux.zip 一个单声道和一个通量,并为每个通量值重复单声道值吗?

是否可以执行类似下面代码的操作?我有一项服务进行 API 调用,另一项服务返回值流。我需要根据 API 调用返回的值修改每个值。

return Flux.zip(
                     someMono.get(),
                     someFlux.Get(),
                     (d, t) -> {
                         //HERE D IS ALWAYS THE SAME AND T IS EVERY NEW FLUX VALUE
                     });
Run Code Online (Sandbox Code Playgroud)

我已经尝试过使用 Mono 的 .repeat() 并且它可以工作,但是每次有新的 Flux 值时它都会调用该方法,并且它是一个 API 调用,所以它不好。

是否可以?

java spring-webflux spring-reactive

3
推荐指数
2
解决办法
6073
查看次数

如何在 Spring Webflux 中获取引用网址?

如何在 Spring Webflux 中获取引用网址?我尝试查看ServerWebExchange exchange对象中的标头属性,但找不到相同的属性。有人可以帮我吗?

java spring-webflux spring-boot-2 spring-reactive spring5

3
推荐指数
1
解决办法
4322
查看次数