标签: project-reactor

如何在Spring WebFlux中记录请求和响应主体

我希望使用Kotlin在Spring WebFlux上的REST API中集中记录请求和响应.到目前为止,我已经尝试过这种方法

@Bean
fun apiRouter() = router {
    (accept(MediaType.APPLICATION_JSON) and "/api").nest {
        "/user".nest {
            GET("/", userHandler::listUsers)
            POST("/{userId}", userHandler::updateUser)
        }
    }
}.filter { request, next ->
    logger.info { "Processing request $request with body ${request.bodyToMono<String>()}" }
    next.handle(request).doOnSuccess { logger.info { "Handling with response $it" } }
}
Run Code Online (Sandbox Code Playgroud)

这里请求方法和路径日志成功但身体是Mono,所以我该如何记录呢?应该是相反的方式,我必须订阅请求正文Mono并将其记录在回调中?另一个问题是ServerResponse这里的接口无法访问响应主体.我怎么能在这里得到它?


我尝试过的另一种方法是使用 WebFilter

@Bean
fun loggingFilter(): WebFilter =
        WebFilter { exchange, chain ->
            val request = exchange.request
            logger.info { "Processing request method=${request.method} path=${request.path.pathWithinApplication()} params=[${request.queryParams}] body=[${request.body}]"  }

            val result …
Run Code Online (Sandbox Code Playgroud)

kotlin spring-boot project-reactor spring-webflux

21
推荐指数
5
解决办法
2万
查看次数

reactor与反应堆中的flatMap

我已经找到了很多关于RxJava的答案,但我想了解它在Reactor中是如何工作的.

我目前的理解非常模糊,我倾向于认为map是同步的,而flatMap是异步的,但我不能真正理解它.

这是一个例子:

files.flatMap { it ->
    Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
        .map {destFile ->
            destFile.createNewFile()
            destFile    
        }               
        .flatMap(it::transferTo)
}.then()  
Run Code Online (Sandbox Code Playgroud)

我有文件(a Flux<FilePart>),我想将它复制到UPLOAD_ROOT服务器上的一些.

这个例子来自一本书.

我可以改变一切.map,以.flatMap反之亦然,一切仍然有效.我想知道区别是什么.

java project-reactor

20
推荐指数
3
解决办法
9559
查看次数

如何在Spring Reactor Web应用程序中执行操作序列并确保一个操作在下一个操作之前完成?

我有Spring Boot 2网络应用程序,我需要通过cookie识别网站访问者并收集页面查看统计信息.所以我需要拦截每个Web请求.我必须编写的代码比回调地狱更复杂(Spring反应堆应该解决的问题).

这是代码:

package mypack.conf;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import org.springframework.http.HttpCookie;
import org.springframework.http.ResponseCookie;
import org.springframework.web.reactive.config.ResourceHandlerRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import mypack.dao.PageViewRepository;
import mypack.dao.UserRepository;
import mypack.domain.PageView;
import mypack.domain.User;
import mypack.security.JwtProvider;

import reactor.core.publisher.Mono;
@Configuration


@ComponentScan(basePackages = "mypack")
@EnableReactiveMongoRepositories(basePackages = "mypack")
public class WebConfig implements WebFluxConfigurer {

    @Autowired
    @Lazy
    private UserRepository userRepository;

    @Autowired
    @Lazy
    private PageViewRepository pageViewRepository;


    @Autowired …
Run Code Online (Sandbox Code Playgroud)

spring spring-boot project-reactor spring-webflux

20
推荐指数
2
解决办法
929
查看次数

将虚拟线程(Project Loom)与 Spring WebFlux/Reactor/Reactive 库结合使用

Java虚拟线程

Java 19 中引入了虚拟线程JEP-425作为预览功能。

在对 Java 虚拟线程(Project Loom)的概念(有时称为轻量级线程(有时称为纤维绿色线程))进行一些研究之后,我对它们与反应式库的潜在使用非常感兴趣,例如基于 Spring WebFlux在 Project Reactor(反应流实现)和 Netty 上,用于有效地进行阻塞调用。

如今,大多数 JVM 实现都将 Java 线程实现为操作系统线程的瘦直接包装器,有时称为重量级、操作系统管理的线程平台线程。

虽然平台线程一次只能执行一个线程,但是当当前执行的虚拟线程进行阻塞调用(例如网络、文件系统、数据库调用)时,虚拟线程能够切换到执行不同的虚拟线程。

我们如何处理 Reactor 中的阻塞调用?

因此,在 Reactor 中处理阻塞调用时,我们使用以下构造

Mono.fromCallable(() -> {
     return blockingOperation();
}).subscribeOn(Schedulers.boundedElastic());
Run Code Online (Sandbox Code Playgroud)

我们subcribeOn()提供了一个Scheduler创建专用线程来执行该阻塞操作的方法。然而,这意味着线程最终将被阻塞,因此,由于我们仍然使用老式的线程模型,我们实际上会阻塞平台线程,这仍然不是处理 CPU 资源的真正有效的方式。

这是问题:

所以,问题是,我们是否可以直接使用具有反应式框架的虚拟线程来进行这样的阻塞调用,例如使用Executors.newVirtualThreadPerTaskExecutor()

创建一个执行器,为每个任务启动一个新的虚拟线程。Executor创建的线程数量是无限制的。

Mono.fromCallable(() -> {
    return …
Run Code Online (Sandbox Code Playgroud)

java spring-boot project-reactor spring-webflux project-loom

20
推荐指数
1
解决办法
9108
查看次数

Mono 超时时的“运算符调用默认 onErrorDropped”

在我的生产代码中,当 Mono 超时时,我的日志中出现错误。
我已设法使用以下代码重新创建这些错误:

@Test
public void testScheduler() {
    Mono<String> callableMethod1 = callableMethod();
    callableMethod1.block();

    Mono<String> callableMethod2 = callableMethod();
    callableMethod2.block();
}

private Mono<String> callableMethod() {
    return Mono.fromCallable(() -> {
        Thread.sleep(60);
        return "Success";
    })
            .subscribeOn(Schedulers.elastic())
            .timeout(Duration.ofMillis(50))
            .onErrorResume(throwable -> Mono.just("Timeout"));
}
Run Code Online (Sandbox Code Playgroud)

Mono.fromCallable我正在使用第三方库进行阻塞调用。当此调用超时时,我收到类似于

reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.publisher.Operators - Scheduler worker in group main failed with an uncaught exception
Run Code Online (Sandbox Code Playgroud)

这些错误似乎也是间歇性的,有时当我运行代码时,我根本没有得到任何错误。但是,当我以 10 的循环重复调用时,我始终得到它们。

project-reactor

19
推荐指数
1
解决办法
8363
查看次数

为什么 Sinks.many().multicast().onBackPressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它

Sinks.Many<String>在使用向多个订阅者通知某些事件时,我遇到了一种我不明白的行为:

fun main() {

    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
Run Code Online (Sandbox Code Playgroud)

此代码显示第一个订阅者获取值 1 和 2,第二个订阅者获取值 2。到目前为止一切顺利:

11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> …
Run Code Online (Sandbox Code Playgroud)

kotlin project-reactor reactive-streams spring-webflux

19
推荐指数
1
解决办法
6215
查看次数

如何将Mono <List <String >>转换为Flux <String>

我正在将用RxJava 1.x编写的小项目转换为Reactor 3.x. 一切都很好,除了我无法找到如何替换flatMap(Observable::from)适当的对应物.我有Mono<List<String>>,我需要将其转换为Flux<String>.

谢谢

java project-reactor

18
推荐指数
2
解决办法
1万
查看次数

block() 、 subscribe() 和 subscribe(-) 之间有什么区别

Mono.delay(Duration.ofMillis(10)).map(d -> {
            System.out.println(d);
            return d;
        }).block();
Run Code Online (Sandbox Code Playgroud)

输出:0

当我使用subscribe()subscribe(-)方法而不是调用时,无法在控制台上看到任何输出block()

Mono.delay(Duration.ofMillis(10)).map(d -> {
        System.out.println(d);
        return d;
    }).subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

doOnSubscribe(-)难道我们只需要在这个方法之后使用吗Mono.delay(-)

 Mono.delay(Duration.ofMillis(10)).doOnSubscribe(s -> {
        System.out.println("its printing doOnSubscribe");
    }).map(d -> {
        System.out.println(d);
        return d;
    }).subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

输出它正在打印doOnSubscribe

project-reactor spring-webflux

18
推荐指数
1
解决办法
2万
查看次数

如何检查Mono是否为空?

我正在使用WebFlux框架开发一个使用Spring Boot 2.0和Kotlin的应用程序.

我想在保存交易之前检查用户ID是否退出.我被困在一个简单的事情,如验证Mono是否为空.

fun createTransaction(serverRequest: ServerRequest) : Mono<ServerResponse> {
    val transaction = serverRequest.body(BodyExtractors.toMono(Transaction::class.java))

    transaction.flatMap {
        val user = userRepository.findById(it.userId)
        // If it's empty, return badRequest() 
    } 

    return transaction.flatMap { transactionRepository.save(it).then(created(URI.create("/transaction/" + it.id)).build()) }
}
Run Code Online (Sandbox Code Playgroud)

有可能做我想要的吗?

spring kotlin project-reactor reactive

17
推荐指数
3
解决办法
2万
查看次数

将CompletableFuture <Stream <T >>转换为Publisher <T>是否正确?

允许对结果流进行多次迭代,CompletableFuture<Stream<String>>我正在考虑以下方法之一:

  1. 将生成的未来转换为CompletableFuture<List<String>>:teams.thenApply(st -> st.collect(toList()))

  2. 将生成的未来转换为Flux<String>缓存:Flux.fromStream(teams::join).cache();

Flux<T>Publisher<T>项目反应堆的实施.

使用案例:

我想Stream<String>从一个数据源获得一个具有顶级联赛球队名称的序列(例如),该数据源提供一个League对象Standing[](基于足球数据RESTful API,例如http://api.football-data.org/v1/ soccerseasons/445/leagueTable).使用AsyncHttpClientGson我们有:

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));
Run Code Online (Sandbox Code Playgroud)

要重新使用生成的流,我有两个选择:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()
Run Code Online (Sandbox Code Playgroud)

Flux<T>不那么冗长,并提供我所需要的一切.然而,在这种情况下使用它是否正确?

或者我应该使用CompletableFuture<List<String>>?或者还有其他更好的选择吗?

更新了一些想法(2018-03-16):

CompletableFuture<List<String>>:

  • [PROS] List<String>将继续收集,当我们需要继续处理未来的结果时,可能已经完成了.
  • [CONS]宣言冗长.
  • [CONS]如果我们只想使用它一次,那么我们就不需要收集那些物品了List<T>.

Flux<String> …

java java-8 rx-java project-reactor completable-future

17
推荐指数
1
解决办法
2030
查看次数