标签: reactor-netty

Spring WebFlux:只允许一个连接接收用户

我正在用Spring 5 Webflux和Kotlin编写一个简单的应用程序.我试图以下列方式实现PUT端点:

PUT("/confs/{id}", {
    val id = it.pathVariable("id")
    ServerResponse.ok().body(service.save(it.bodyToMono(Item::class.java)), Item::class.java)
})
Run Code Online (Sandbox Code Playgroud)

保存的技巧是我尝试从项目中读取城市名称,解析地理坐标,在原始项目中覆盖它们,然后使用Spring Data Mongo Reactive repo保存到Mongo.

fun save(item: Mono<Item>): Mono<Item> {
    val geo = item.flatMap {
            val city = it.location?.city ?: "Somewhere"
            geoService.resolveGeoFromCity(city)
    }

    val zipped = item.zipWith(geo)
        .map {
            it.t1.location?.geo = it.t2
            it.t1
        }

    return repo.saveAll(zipped)
        .toMono()
}
Run Code Online (Sandbox Code Playgroud)

解决地理坐标的代码如下:

@Service
class GeoService() {

    val client = WebClient.create("https://maps.googleapis.com/maps/api/geocode/")

    fun resolveGeoFromCity(city: String): Mono<Geo> {
        return client.get()
                .uri("json?address=$city&key=$API_KEY&language=en")
                .exchange()
                .flatMap { it.bodyToMono(String::class.java) }
                .map { parse(it) }
    }

    private fun parse(response: …
Run Code Online (Sandbox Code Playgroud)

kotlin spring-boot project-reactor reactor-netty spring-webflux

4
推荐指数
1
解决办法
8379
查看次数

如何重新连接 ReactorNettyWebSocketClient 连接?

我需要访问一个 websocket 服务,它在 24 小时后关闭一个打开的 websocket 连接。我如何实现与 Spring-Boot 2 和 Webflux 的重新连接?

这是我到目前为止所拥有的(取自https://github.com/artembilan/webflux-websocket-demo):

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreaming() throws URISyntaxException {
    ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();

    EmitterProcessor<String> output = EmitterProcessor.create();

    Mono<Void> sessionMono = client.execute(new URI("ws://localhost:8080/echo"),

    session -> session.receive()
    .timeout(Duration.ofSeconds(3))
    .map(WebSocketMessage::getPayloadAsText)
    .subscribeWith(output)
    .then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}
Run Code Online (Sandbox Code Playgroud)

一旦连接丢失(3 秒不再输入),就会抛出 TimeoutException。但是如何重新连接套接字?

spring-boot reactor-netty spring-webflux

4
推荐指数
1
解决办法
2571
查看次数

Spring Security WebFlux - 带有身份验证的主体

我想实现简单的 Spring Security WebFlux 应用程序。
我想使用像这样的 JSON 消息

{
   'username': 'admin', 
   'password': 'adminPassword'
} 
Run Code Online (Sandbox Code Playgroud)

在正文中(对 /signin 的 POST 请求)以登录我的应用程序。

我做了什么?

我创建了这个配置

@Configuration
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity(proxyTargetClass = true)
public class WebFluxSecurityConfig {

    @Autowired
    private ReactiveUserDetailsService userDetailsService;

    @Autowired
    private ObjectMapper mapper;

    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder(11);
    }

    @Bean
    public ServerSecurityContextRepository securityContextRepository() {
        WebSessionServerSecurityContextRepository securityContextRepository =
                new WebSessionServerSecurityContextRepository();

        securityContextRepository.setSpringSecurityContextAttrName("securityContext");

        return securityContextRepository;
    }

    @Bean
    public ReactiveAuthenticationManager authenticationManager() {
        UserDetailsRepositoryReactiveAuthenticationManager authenticationManager =
                new UserDetailsRepositoryReactiveAuthenticationManager(userDetailsService);

        authenticationManager.setPasswordEncoder(passwordEncoder());

        return authenticationManager;
    }

    @Bean
    public AuthenticationWebFilter authenticationWebFilter() { …
Run Code Online (Sandbox Code Playgroud)

java spring-security reactor-netty spring-webflux

4
推荐指数
1
解决办法
1万
查看次数

获取 webflux 事件循环调度程序

我将webfluxnettyjdbc一起使用,因此我用以下方式包装阻塞 jdbc 操作:

static <T> Mono<T> fromOne(Callable<T> blockingOperation) {
    return Mono.fromCallable(blockingOperation)
        .subscribeOn(jdbcScheduler)
        .publishOn(Schedulers.parallel());
}
Run Code Online (Sandbox Code Playgroud)

阻塞操作将由jdbcScheduler处理,我希望其他管道将由webflux event-loop Scheduler处理。

如何获取 webflux 事件循环调度程序?

project-reactor reactor-netty spring-webflux

4
推荐指数
1
解决办法
2605
查看次数

Springboot Webflux反​​应器并发模型

我想了解更多有关springboot webflux的基础并发模型的信息?

对于CPU密集型Web服务,传统的阻塞多线程模型是否更合适?还是根据本文https://people.eecs.berkeley.edu/~brewer/papers/threads-hotos-2003.pdf,在一般传统线程池模型中更合适?

如果我在反应堆链中有阻塞步骤,则可以使用publishOn将其调度到其他线程池。这样是否可以释放原始线程并使整个链仍然畅通无阻?

spring-boot project-reactor reactor-netty

4
推荐指数
1
解决办法
950
查看次数

Spring webclient 是非阻塞客户端吗?

我不明白反应式 webclient 的工作原理。它说spring webclient是非阻塞客户端,但是这个webclient似乎在等待来自远程api的onComplete()信号,然后它可以处理从远程api发出的每个项目。我期望当 onNext() 从目标 api 被触发时,webclient 可以处理每个项目

我是春季 webflux 世界的新手。我读到它,它说它使用 netty 作为默认服务器。而这个 netty 使用 eventloop。所以为了理解它是如何工作的,我尝试创建 2 个小应用程序,客户端和服务器。服务器应用程序只返回简单的通量,每个项目延迟 1 秒。客户端应用程序使用 webclient 调用远程 api。

服务器:

@GetMapping(ITEM_END_POINT_V1)
public Flux<Item> getAllItems(){
        return Flux.just(new Item(null, "Samsung TV", 399.99),
                new Item(null, "LG TV", 329.99),
                new Item(null, "Apple Watch", 349.99),
                new Item("ABC", "Beats HeadPhones", 
      149.99)).delayElements(Duration.ofSeconds(1)).log("Item : ");
}
Run Code Online (Sandbox Code Playgroud)

客户:

WebClient webClient = WebClient.create("http://localhost:8080");

@GetMapping("/client/retrieve")
public Flux<Item> getAllItemsUsingRetrieve() {
        return webClient.get().uri("/v1/items")
                .retrieve()
                .bodyToFlux(Item.class).log();
}
Run Code Online (Sandbox Code Playgroud)

从服务器登录:

2019-05-01 22:44:20.121  INFO 19644 --- [ctor-http-nio-2] Item :                                   : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-05-01 …
Run Code Online (Sandbox Code Playgroud)

reactor-netty spring-webflux spring-webclient

4
推荐指数
1
解决办法
3812
查看次数

带有 netty 的 spring-web-flux 错误“反射 setAccessible(true) 已禁用”

我有一个像上面一样的 junit 测试,但是当我启动测试时,我收到这个错误(JDK 13.0.1):

java.lang.UnsupportedOperationException: Reflective setAccessible(true) disabled
java.lang.IllegalAccessException: class io.netty.util.internal.PlatformDependent0$6 cannot access class jdk.internal.misc.Unsafe (in module java.base) because module java.base does not export jdk.internal.misc to unnamed module @4a94ee4
Run Code Online (Sandbox Code Playgroud)

我的工具是 Intellij CE Edition 2019.3.1。这是我的类和 pom 文件:

    import org.junit.jupiter.api.Assertions;
    import org.junit.jupiter.api.Test;
    import org.springframework.web.reactive.function.client.WebClient;
    import reactor.core.publisher.Flux;

    class WebClientStockClientIntegrationTest {

        private static final String SYMBOL = "EUR";
        private WebClient webClient = WebClient.builder().build();

        @Test
        void shouldRetrieveStockPricesFromStockService() {
            WebClientStockClient webClientStockClient = new WebClientStockClient(webClient);
            Flux<StockPrice> prices = webClientStockClient.pricesFor(SYMBOL);
            Assertions.assertNotNull(prices);
            Assertions.assertTrue(prices.take(5).count().block() > 0);
        }
}
Run Code Online (Sandbox Code Playgroud)

我的带有 spring-boot …

java spring-boot reactor-netty spring-webflux

4
推荐指数
1
解决办法
3600
查看次数

Netty Http 客户端 - 配置 DNSResolution 超时

我在使用 WebClient 时遇到此错误。根据我的配置,这是预期的。但是如何通过netty http客户端配置dns名称解析的超时时间呢?

Caused by: io.netty.resolver.dns.DnsNameResolverTimeoutException: [/127.0.0.11:53] query via UDP timed out after 5000 milliseconds (no stack trace available)
Run Code Online (Sandbox Code Playgroud)

netty reactor-netty spring-webflux

4
推荐指数
1
解决办法
5103
查看次数

Spring WebClient:重试中调用方法

我一直在寻找以下用例的解决方案但没有成功,我希望有人可以提供帮助:

假设以下用例。我需要调用客户 Api( customerApi),而该 api 需要一个Bearer令牌,当我调用时该令牌可能已过期customerApi。如果令牌已过期,则customerApi返回401响应。

我想做的是,如果我收到一个401并调用该方法来获取新Bearer令牌,则仅重试一次。如果重试仍然返回401,我需要抛出一个Exception

获取token的方法Bearer

private String getToken() {
    return oAuthService.getToken();
}
Run Code Online (Sandbox Code Playgroud)

webClient调用customerApi(customerWebClient是用 ) 创建的 bean 的用法WebClient.Builder

public Customer getCustomerById(String customerId, String token) {
        return customerWebClient.get()
            .uri("myurl/customers/{customerId}, customerId)
            .headers(httpHeaders -> {
                httpHeaders.add(HttpHeaders.AUTHORIZATION, "Bearer " + token);
            })
            .retrieve()
            .bodyToMono(Customer.class)
            .onErrorResume(WebClientResponseException.NotFound.class, notFound ->
                        Mono.error(new MyCustomException()))
            .block();
    }
Run Code Online (Sandbox Code Playgroud)

看来retryWhen只能用超时来升级了。所以我希望有人知道如何实现这个用例^^

感谢您的帮助 :)

编辑 …

reactor-netty spring-webflux spring-webclient

3
推荐指数
1
解决办法
2906
查看次数

如何初始化reactor HttpClient的基本身份验证标头?

我可以在 WebClientBuilder 中添加标头,如下所示:

WebClient.builder().baseUrl(...).defaultHeaders(header -> header.setBasicAuth(...)[...].build();
Run Code Online (Sandbox Code Playgroud)

使用 HttpClient 我正在尝试:

HttpClient.create().baseUrl(...).headers(/*not sure how to set the basic authentication here*/)
Run Code Online (Sandbox Code Playgroud)

java spring reactor-netty

3
推荐指数
1
解决办法
1714
查看次数