我正在用Spring 5 Webflux和Kotlin编写一个简单的应用程序.我试图以下列方式实现PUT端点:
PUT("/confs/{id}", {
val id = it.pathVariable("id")
ServerResponse.ok().body(service.save(it.bodyToMono(Item::class.java)), Item::class.java)
})
Run Code Online (Sandbox Code Playgroud)
保存的技巧是我尝试从项目中读取城市名称,解析地理坐标,在原始项目中覆盖它们,然后使用Spring Data Mongo Reactive repo保存到Mongo.
fun save(item: Mono<Item>): Mono<Item> {
val geo = item.flatMap {
val city = it.location?.city ?: "Somewhere"
geoService.resolveGeoFromCity(city)
}
val zipped = item.zipWith(geo)
.map {
it.t1.location?.geo = it.t2
it.t1
}
return repo.saveAll(zipped)
.toMono()
}
Run Code Online (Sandbox Code Playgroud)
解决地理坐标的代码如下:
@Service
class GeoService() {
val client = WebClient.create("https://maps.googleapis.com/maps/api/geocode/")
fun resolveGeoFromCity(city: String): Mono<Geo> {
return client.get()
.uri("json?address=$city&key=$API_KEY&language=en")
.exchange()
.flatMap { it.bodyToMono(String::class.java) }
.map { parse(it) }
}
private fun parse(response: …Run Code Online (Sandbox Code Playgroud) kotlin spring-boot project-reactor reactor-netty spring-webflux
我需要访问一个 websocket 服务,它在 24 小时后关闭一个打开的 websocket 连接。我如何实现与 Spring-Boot 2 和 Webflux 的重新连接?
这是我到目前为止所拥有的(取自https://github.com/artembilan/webflux-websocket-demo):
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreaming() throws URISyntaxException {
ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
EmitterProcessor<String> output = EmitterProcessor.create();
Mono<Void> sessionMono = client.execute(new URI("ws://localhost:8080/echo"),
session -> session.receive()
.timeout(Duration.ofSeconds(3))
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then());
return output.doOnSubscribe(s -> sessionMono.subscribe());
}
Run Code Online (Sandbox Code Playgroud)
一旦连接丢失(3 秒不再输入),就会抛出 TimeoutException。但是如何重新连接套接字?
我想实现简单的 Spring Security WebFlux 应用程序。
我想使用像这样的 JSON 消息
{
'username': 'admin',
'password': 'adminPassword'
}
Run Code Online (Sandbox Code Playgroud)
在正文中(对 /signin 的 POST 请求)以登录我的应用程序。
我做了什么?
我创建了这个配置
@Configuration
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity(proxyTargetClass = true)
public class WebFluxSecurityConfig {
@Autowired
private ReactiveUserDetailsService userDetailsService;
@Autowired
private ObjectMapper mapper;
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder(11);
}
@Bean
public ServerSecurityContextRepository securityContextRepository() {
WebSessionServerSecurityContextRepository securityContextRepository =
new WebSessionServerSecurityContextRepository();
securityContextRepository.setSpringSecurityContextAttrName("securityContext");
return securityContextRepository;
}
@Bean
public ReactiveAuthenticationManager authenticationManager() {
UserDetailsRepositoryReactiveAuthenticationManager authenticationManager =
new UserDetailsRepositoryReactiveAuthenticationManager(userDetailsService);
authenticationManager.setPasswordEncoder(passwordEncoder());
return authenticationManager;
}
@Bean
public AuthenticationWebFilter authenticationWebFilter() { …Run Code Online (Sandbox Code Playgroud) 我将webflux与netty和jdbc一起使用,因此我用以下方式包装阻塞 jdbc 操作:
static <T> Mono<T> fromOne(Callable<T> blockingOperation) {
return Mono.fromCallable(blockingOperation)
.subscribeOn(jdbcScheduler)
.publishOn(Schedulers.parallel());
}
Run Code Online (Sandbox Code Playgroud)
阻塞操作将由jdbcScheduler处理,我希望其他管道将由webflux event-loop Scheduler处理。
如何获取 webflux 事件循环调度程序?
我想了解更多有关springboot webflux的基础并发模型的信息?
对于CPU密集型Web服务,传统的阻塞多线程模型是否更合适?还是根据本文https://people.eecs.berkeley.edu/~brewer/papers/threads-hotos-2003.pdf,在一般传统线程池模型中更合适?
如果我在反应堆链中有阻塞步骤,则可以使用publishOn将其调度到其他线程池。这样是否可以释放原始线程并使整个链仍然畅通无阻?
我不明白反应式 webclient 的工作原理。它说spring webclient是非阻塞客户端,但是这个webclient似乎在等待来自远程api的onComplete()信号,然后它可以处理从远程api发出的每个项目。我期望当 onNext() 从目标 api 被触发时,webclient 可以处理每个项目
我是春季 webflux 世界的新手。我读到它,它说它使用 netty 作为默认服务器。而这个 netty 使用 eventloop。所以为了理解它是如何工作的,我尝试创建 2 个小应用程序,客户端和服务器。服务器应用程序只返回简单的通量,每个项目延迟 1 秒。客户端应用程序使用 webclient 调用远程 api。
服务器:
@GetMapping(ITEM_END_POINT_V1)
public Flux<Item> getAllItems(){
return Flux.just(new Item(null, "Samsung TV", 399.99),
new Item(null, "LG TV", 329.99),
new Item(null, "Apple Watch", 349.99),
new Item("ABC", "Beats HeadPhones",
149.99)).delayElements(Duration.ofSeconds(1)).log("Item : ");
}
Run Code Online (Sandbox Code Playgroud)
客户:
WebClient webClient = WebClient.create("http://localhost:8080");
@GetMapping("/client/retrieve")
public Flux<Item> getAllItemsUsingRetrieve() {
return webClient.get().uri("/v1/items")
.retrieve()
.bodyToFlux(Item.class).log();
}
Run Code Online (Sandbox Code Playgroud)
从服务器登录:
2019-05-01 22:44:20.121 INFO 19644 --- [ctor-http-nio-2] Item : : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-05-01 …Run Code Online (Sandbox Code Playgroud) 我有一个像上面一样的 junit 测试,但是当我启动测试时,我收到这个错误(JDK 13.0.1):
java.lang.UnsupportedOperationException: Reflective setAccessible(true) disabled
java.lang.IllegalAccessException: class io.netty.util.internal.PlatformDependent0$6 cannot access class jdk.internal.misc.Unsafe (in module java.base) because module java.base does not export jdk.internal.misc to unnamed module @4a94ee4
Run Code Online (Sandbox Code Playgroud)
我的工具是 Intellij CE Edition 2019.3.1。这是我的类和 pom 文件:
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
class WebClientStockClientIntegrationTest {
private static final String SYMBOL = "EUR";
private WebClient webClient = WebClient.builder().build();
@Test
void shouldRetrieveStockPricesFromStockService() {
WebClientStockClient webClientStockClient = new WebClientStockClient(webClient);
Flux<StockPrice> prices = webClientStockClient.pricesFor(SYMBOL);
Assertions.assertNotNull(prices);
Assertions.assertTrue(prices.take(5).count().block() > 0);
}
}
Run Code Online (Sandbox Code Playgroud)
我的带有 spring-boot …
我在使用 WebClient 时遇到此错误。根据我的配置,这是预期的。但是如何通过netty http客户端配置dns名称解析的超时时间呢?
Caused by: io.netty.resolver.dns.DnsNameResolverTimeoutException: [/127.0.0.11:53] query via UDP timed out after 5000 milliseconds (no stack trace available)
Run Code Online (Sandbox Code Playgroud) 我一直在寻找以下用例的解决方案但没有成功,我希望有人可以提供帮助:
假设以下用例。我需要调用客户 Api( customerApi),而该 api 需要一个Bearer令牌,当我调用时该令牌可能已过期customerApi。如果令牌已过期,则customerApi返回401响应。
我想做的是,如果我收到一个401并调用该方法来获取新Bearer令牌,则仅重试一次。如果重试仍然返回401,我需要抛出一个Exception
获取token的方法Bearer:
private String getToken() {
return oAuthService.getToken();
}
Run Code Online (Sandbox Code Playgroud)
webClient调用customerApi(customerWebClient是用 ) 创建的 bean 的用法WebClient.Builder:
public Customer getCustomerById(String customerId, String token) {
return customerWebClient.get()
.uri("myurl/customers/{customerId}, customerId)
.headers(httpHeaders -> {
httpHeaders.add(HttpHeaders.AUTHORIZATION, "Bearer " + token);
})
.retrieve()
.bodyToMono(Customer.class)
.onErrorResume(WebClientResponseException.NotFound.class, notFound ->
Mono.error(new MyCustomException()))
.block();
}
Run Code Online (Sandbox Code Playgroud)
看来retryWhen只能用超时来升级了。所以我希望有人知道如何实现这个用例^^
感谢您的帮助 :)
编辑 …
我可以在 WebClientBuilder 中添加标头,如下所示:
WebClient.builder().baseUrl(...).defaultHeaders(header -> header.setBasicAuth(...)[...].build();
Run Code Online (Sandbox Code Playgroud)
使用 HttpClient 我正在尝试:
HttpClient.create().baseUrl(...).headers(/*not sure how to set the basic authentication here*/)
Run Code Online (Sandbox Code Playgroud)