环境:Spring Boot 2.3.1、Java 11
我已经尝试了一些东西(也与 spring 的示例应用程序进行比较),但到目前为止,我未能成功创建WebClient需要ReactiveClientRegistrationRepository.
启动我的 spring-boot 应用程序时出现以下异常:
required a bean of type 'org.springframework.security.oauth2.client.registration.ReactiveClientRegistrationRepository'
Run Code Online (Sandbox Code Playgroud)
我理解 spring-boot-autoconfigure 的方式应该使用ReactiveOAuth2ClientAutoConfiguration,因为在 yml 文件中给出了所需的属性。
根据一些代码片段,如果缺少某些内容,我可以提供更多信息以获取整个上下文
主级
@Slf4j
@SpringBootApplication
@EnableConfigurationProperties(MyAppConfigurationProperties.class)
public class MyApp{
public static void main(final String[] args) {
SpringApplication.run(MyApp.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
配置:
@Configuration
//@Import(ReactiveOAuth2ClientAutoConfiguration.class) // in the test it works with this, but should not be required: spring-boot-autoconfigure
public class MyRestClientConfig {
@Bean
WebClient myWebClient(WebClient.Builder builder, ReactiveClientRegistrationRepository clientRegistrations) {
//content not relevant to this …Run Code Online (Sandbox Code Playgroud) 我有一个具有以下依赖项的项目:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<!-- Streaming/Kafka -->
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<exclusions>
<exclusion>
<!--
HikariCP is excluded so tomcat connection is used.
This is necessary to set "connection-properties: v$session.program" param …Run Code Online (Sandbox Code Playgroud) 我相信这是记录请求和响应的标准,以便跟踪用户报告的任何问题并将其与 Splunk 等工具集成。
到目前为止,我已经找到了两种方法,但没有一种方法能够真正正确地解决这个问题:
有人甚至写道它是针对反应式堆栈的: https: //stackoverflow.com/a/45280764/4950459
所以我有疑问:
logging reactive-programming spring-boot spring-webflux spring-reactive
在我的处理函数中我有这个方法
public Mono<ServerResponse> itemsEx(ServerRequest serverRequest) {
throw new RuntimeException("RuntimeException Occured");
}
Run Code Online (Sandbox Code Playgroud)
现在,我想处理这个异常,所以我重写AbstractErrorWebExceptionHandler并创建这个类。
package com.learnreactivespring.learnreactivespring.exception;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.web.ResourceProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;
import java.util.Map;
@Component
@Slf4j
public class FunctionalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
public FunctionalErrorWebExceptionHandler(ErrorAttributes errorAttributes,
ApplicationContext applicationContext,
ServerCodecConfigurer serverCodecConfigurer) {
super(errorAttributes, new ResourceProperties(), applicationContext);
super.setMessageWriters(serverCodecConfigurer.getWriters());
super.setMessageReaders(serverCodecConfigurer.getReaders());
}
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
private Mono<ServerResponse> renderErrorResponse(ServerRequest …Run Code Online (Sandbox Code Playgroud) 通过调用Flux.push和使用pushlambda 表达式中的接收器直接创建 Flux与使用由 a 提供的接收器有DirectProcessor什么区别?
在 Flux 只发出几个事件的最小示例中,我可以做
Flux.<String>push(emitter -> {
emitter.next("One");
emitter.next("Two");
emitter.complete();
});
Run Code Online (Sandbox Code Playgroud)
与使用 DirectProcessor
var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();
Run Code Online (Sandbox Code Playgroud)
澄清一下:我知道我可以Flux.just在这里使用,但我的用例实际上是在 Spring 的@EventListeners 和 Spring WebFlux之间建立一座桥梁,我想为每个传入的特定资源的 SSE 请求创建一个 Flux,然后将事件发布到这个通量。
有人能告诉我,这两种方法是否都有效?当然,肯定有一些不同。特别是,Reactor Reference Guide部分关于DirectProcessor状态:
另一方面,它具有不处理背压的限制。因此,如果您通过 DirectProcessor 推送 N 个元素但至少其中一个订阅者的请求少于 N,则 DirectProcessor 会向其订阅者发出 IllegalStateException 信号。
这意味着什么?
[编辑:]在我使用的问题的早期版本中,Flux.generate()而不是Flux.push(),这显然是错误的,因为 generate 最多可以创建一个事件。
[编辑 2:] @123 向我询问了我想要实现的目标的完整示例。忍受我,这是一个 SO 问题的相当数量的代码:
我想在(非反应式)Spring 域事件侦听器和反应式 Flux 之间建立一座桥梁,然后我可以在 …
我使用 Spring Cloud Gateway 作为边缘服务器。这就是流程
如果请求具有名为“x-foo”的标头,则查找标头值,从另一台服务器获取字符串并将该字符串作为响应发送,而不是实际代理请求。
这是 Filter DSL 的代码
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("foo-filter", r -> r.header('x-foo').and().header("x-intercepted").negate()
.filters(f -> f.filter(fooFilter))
.uri("http://localhost:8081")) // 8081 is self port, there are other proxy related configurations too
.build();
}
Run Code Online (Sandbox Code Playgroud)
Foo 过滤器的代码
@Component
@Slf4j
public class FooFilter implements GatewayFilter {
@Autowired
private ReactiveRedisOperations<String, String> redisOps;
@Value("${header-name}")
private String headerName;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
var foo = request.getHeaders().getFirst(headerName);
return redisOps.opsForHash()
.get("foo:" + foo, …Run Code Online (Sandbox Code Playgroud) 我有一个使用 WebFlux 和 REST API 的反应式 Spring 应用程序。每当用户调用我的 API 时,我都需要调用公开 WSDL 的 SOAP 服务,执行一些操作并返回结果。
如何将对 SOAP 服务的调用与 Reactive WebFlux 框架结合起来?
在我看来,我可以通过两种不同的方式来做到这一点:
第一种方法是我的偏好,但我不知道该怎么做。
这里也提出了类似的问题: Reactive Spring WebClient - Making a SOAP call,它引用了这篇博客文章(https://blog.godatadriven.com/jaxws-reactive-client)。但我无法让这个例子发挥作用。
在 Gradle 插件中使用,wsdl2java我可以使用异步方法创建客户端界面,但我不明白如何使用它。使用时WebServiceGatewaySupport,我根本不使用生成的接口或其方法。相反,我调用通用marshalSendAndReceive方法
public class MySoapClient extends WebServiceGatewaySupport {
public QueryResponse execute() {
Query query = new ObjectFactory().createQuery();
// Further create and set the domain object …Run Code Online (Sandbox Code Playgroud) 在我的请求处理程序中,如果传入的内容accountId无法转换为有效的,ObjectId我想捕获错误并发回有意义的消息;但是,这样做会导致返回类型不兼容,并且我无法弄清楚如何实现这个非常简单的用例。
我的代码:
@GetMapping("/{accountId}")
public Mono<ResponseEntity<Account>> get(@PathVariable String accountId) {
log.debug(GETTING_DATA_FOR_ACCOUNT, accountId);
try {
ObjectId id = new ObjectId(accountId);
return repository.findById(id)
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
} catch (IllegalArgumentException ex) {
log.error(MALFORMED_OBJECT_ID, accountId);
// TODO(marco): find a way to return the custom error message. This seems to be currently
// impossible with the Reactive API, as using body(message) changes the return type to
// be incompatible (and Mono<ResponseEntity<?>> does not seem to cut it).
return Mono.just(ResponseEntity.badRequest().build());
}
}
Run Code Online (Sandbox Code Playgroud)
该body(T …
在 Flux 中,地图函数也为通量中的每个项目执行。对于通量中的每个项目(发射的),doOnNext 函数也被执行。从用户角度看有什么区别?为什么存在两种类似的方法?可以用简单的易用性来解释。
reactive-programming reactive spring-webflux spring-reactive
我想在我的反应式 Spring Boot 应用程序中使用外部 OAuth 提供程序对用户进行身份验证。
按照官方教程,我成功地使用预配置的提供程序(Google、Github 等)实现了该流程。可以使用这些属性将配置更改为未预先配置的提供程序,例如:
spring.security.oauth2.client.registration.<providerName>.client-id=<clientId>
spring.security.oauth2.client.registration.<providerName>.client-secret=<clientSecret>
spring.security.oauth2.client.registration.<providerName>.redirect-uri={baseUrl}/login/oauth2/code/<providerName>
spring.security.oauth2.client.registration.<providerName>.provider=<providerName>
spring.security.oauth2.client.registration.<providerName>.client-authentication-method=basic
spring.security.oauth2.client.registration.<providerName>.authorization-grant-type=authorization_code
spring.security.oauth2.client.provider.<providerName>.authorization-uri=https://api.<providerName>.com/authorize
spring.security.oauth2.client.provider.<providerName>.token-uri=https://api.<providerName>.com/token
Run Code Online (Sandbox Code Playgroud)
通过此设置,将提示用户登录页面,并使用以下命令调用指定的重定向 url authCode:
但是,返回此错误页面,控制台中没有日志条目或异常(即使我设置了该logging.level.org.springframework.security=DEBUG属性)。
可能是什么问题?我可以从哪里开始调试这个?
spring spring-security spring-boot spring-security-oauth2 spring-reactive
是否可以执行类似下面代码的操作?我有一项服务进行 API 调用,另一项服务返回值流。我需要根据 API 调用返回的值修改每个值。
return Flux.zip(
someMono.get(),
someFlux.Get(),
(d, t) -> {
//HERE D IS ALWAYS THE SAME AND T IS EVERY NEW FLUX VALUE
});
Run Code Online (Sandbox Code Playgroud)
我已经尝试过使用 Mono 的 .repeat() 并且它可以工作,但是每次有新的 Flux 值时它都会调用该方法,并且它是一个 API 调用,所以它不好。
是否可以?
如何在 Spring Webflux 中获取引用网址?我尝试查看ServerWebExchange exchange对象中的标头属性,但找不到相同的属性。有人可以帮我吗?
spring-reactive ×12
java ×6
spring-boot ×5
spring ×4
logging ×1
reactive ×1
soap ×1
spring-ws ×1
spring5 ×1