如何在Spring WebFlux中记录请求和响应主体

Kog*_*uro 21 kotlin spring-boot project-reactor spring-webflux

我希望使用Kotlin在Spring WebFlux上的REST API中集中记录请求和响应.到目前为止,我已经尝试过这种方法

@Bean
fun apiRouter() = router {
    (accept(MediaType.APPLICATION_JSON) and "/api").nest {
        "/user".nest {
            GET("/", userHandler::listUsers)
            POST("/{userId}", userHandler::updateUser)
        }
    }
}.filter { request, next ->
    logger.info { "Processing request $request with body ${request.bodyToMono<String>()}" }
    next.handle(request).doOnSuccess { logger.info { "Handling with response $it" } }
}
Run Code Online (Sandbox Code Playgroud)

这里请求方法和路径日志成功但身体是Mono,所以我该如何记录呢?应该是相反的方式,我必须订阅请求正文Mono并将其记录在回调中?另一个问题是ServerResponse这里的接口无法访问响应主体.我怎么能在这里得到它?


我尝试过的另一种方法是使用 WebFilter

@Bean
fun loggingFilter(): WebFilter =
        WebFilter { exchange, chain ->
            val request = exchange.request
            logger.info { "Processing request method=${request.method} path=${request.path.pathWithinApplication()} params=[${request.queryParams}] body=[${request.body}]"  }

            val result = chain.filter(exchange)

            logger.info { "Handling with response ${exchange.response}" }

            return@WebFilter result
        }
Run Code Online (Sandbox Code Playgroud)

同样的问题:请求正文是Flux没有响应主体.

有没有办法从某些过滤器访问完整的请求和响应?我不明白什么?

Bri*_*zel 13

这或多或少类似于Spring MVC中的情况.

在Spring MVC中,您可以使用AbstractRequestLoggingFilter过滤器 ContentCachingRequestWrapper和/或ContentCachingResponseWrapper.这里有许多权衡:

  • 如果您想访问servlet请求属性,则需要实际读取和解析请求主体
  • 记录请求主体意味着缓冲请求主体,该主体可以使用大量内存
  • 如果你想访问响应主体,你需要包装响应并在写入时缓冲响应主体,以便以后检索

ContentCaching*WrapperWebFlux中不存在类,但您可以创建类似的类.但请记住其他要点:

  • 在内存中缓冲数据会以某种方式违背反应堆栈,因为我们在那里尝试使用可用资源非常高效
  • 你不应该篡改实际的数据流,并且比预期的更频繁/更少地冲洗,否则你就有可能破坏流媒体用例
  • 在该级别,您只能访问DataBuffer实例,这些实例是(大致)内存高效的字节数组.那些属于缓冲池,并被回收用于其他交换.如果没有正确保留/释放它们,则会创建内存泄漏(缓冲数据以供以后使用,这当然适合这种情况)
  • 再次在该级别,它只是字节,你没有访问任何编解码器来解析HTTP正文.如果内容首先不是人类可读的,我会忘记缓冲内容

您问题的其他答案:

  • 是的,这WebFilter可能是最好的方法
  • 不,您不应该订阅请求正文,否则您将使用处理程序无法读取的数据; 你可以flatMapdoOn运算符上请求和缓冲数据
  • 包装响应应该允许您在写入时访问响应主体; 但是,不要忘记内存泄漏

  • 感谢您的详细回答。看起来这种高级过滤(和日志记录)与核心反应式意识形态背道而驰,我应该考虑将日志记录移至业务级别(至少对于响应而言) (2认同)

Dar*_*ski 9

我没有找到记录请求/响应主体的好方法,但如果您只对元数据感兴趣,那么您可以像下面这样做.

import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
import org.springframework.http.server.reactive.ServerHttpResponse
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.WebFilter
import org.springframework.web.server.WebFilterChain
import reactor.core.publisher.Mono

@Component
class LoggingFilter(val requestLogger: RequestLogger, val requestIdFactory: RequestIdFactory) : WebFilter {
    val logger = logger()

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        logger.info(requestLogger.getRequestMessage(exchange))
        val filter = chain.filter(exchange)
        exchange.response.beforeCommit {
            logger.info(requestLogger.getResponseMessage(exchange))
            Mono.empty()
        }
        return filter
    }
}

@Component
class RequestLogger {

    fun getRequestMessage(exchange: ServerWebExchange): String {
        val request = exchange.request
        val method = request.method
        val path = request.uri.path
        val acceptableMediaTypes = request.headers.accept
        val contentType = request.headers.contentType
        return ">>> $method $path ${HttpHeaders.ACCEPT}: $acceptableMediaTypes ${HttpHeaders.CONTENT_TYPE}: $contentType"
    }

    fun getResponseMessage(exchange: ServerWebExchange): String {
        val request = exchange.request
        val response = exchange.response
        val method = request.method
        val path = request.uri.path
        val statusCode = getStatus(response)
        val contentType = response.headers.contentType
        return "<<< $method $path HTTP${statusCode.value()} ${statusCode.reasonPhrase} ${HttpHeaders.CONTENT_TYPE}: $contentType"
    }

    private fun getStatus(response: ServerHttpResponse): HttpStatus =
        try {
            response.statusCode
        } catch (ex: Exception) {
            HttpStatus.CONTINUE
        }
}
Run Code Online (Sandbox Code Playgroud)

  • 该死的“var”之类的东西。 (2认同)

Iva*_*mar 6

这就是我为java想到的。

public class RequestResponseLoggingFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest httpRequest = exchange.getRequest();
        final String httpUrl = httpRequest.getURI().toString();

        ServerHttpRequestDecorator loggingServerHttpRequestDecorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
            String requestBody = "";

            @Override
            public Flux<DataBuffer> getBody() {
                return super.getBody().doOnNext(dataBuffer -> {
                    try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                        Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        requestBody = IOUtils.toString(byteArrayOutputStream.toByteArray(), "UTF-8");
                        commonLogger.info(LogMessage.builder()
                                .step(httpUrl)
                                .message("log incoming http request")
                                .stringPayload(requestBody)
                                .build());
                    } catch (IOException e) {
                        commonLogger.error(LogMessage.builder()
                                .step("log incoming request for " + httpUrl)
                                .message("fail to log incoming http request")
                                .errorType("IO exception")
                                .stringPayload(requestBody)
                                .build(), e);
                    }
                });
            }
        };

        ServerHttpResponseDecorator loggingServerHttpResponseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
            String responseBody = "";
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                Mono<DataBuffer> buffer = Mono.from(body);
                return super.writeWith(buffer.doOnNext(dataBuffer -> {
                    try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                        Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        responseBody = IOUtils.toString(byteArrayOutputStream.toByteArray(), "UTF-8");
                        commonLogger.info(LogMessage.builder()
                                .step("log outgoing response for " + httpUrl)
                                .message("incoming http request")
                                .stringPayload(responseBody)
                                .build());
                    } catch (Exception e) {
                        commonLogger.error(LogMessage.builder()
                                .step("log outgoing response for " + httpUrl)
                                .message("fail to log http response")
                                .errorType("IO exception")
                                .stringPayload(responseBody)
                                .build(), e);
                    }
                }));
            }
        };
        return chain.filter(exchange.mutate().request(loggingServerHttpRequestDecorator).response(loggingServerHttpResponseDecorator).build());
    }

}
Run Code Online (Sandbox Code Playgroud)


ROC*_*CKY 5

实际上,您可以启用 Netty 和 Reactor-Netty 相关的 DEBUG 日志记录,以查看正在发生的情况的全貌。你可以尝试下面的内容,看看你想要什么,不想要什么。这是我能做到的最好的了。

reactor.ipc.netty.channel.ChannelOperationsHandler: DEBUG
reactor.ipc.netty.http.server.HttpServer: DEBUG
reactor.ipc.netty.http.client: DEBUG
io.reactivex.netty.protocol.http.client: DEBUG
io.netty.handler: DEBUG
io.netty.handler.proxy.HttpProxyHandler: DEBUG
io.netty.handler.proxy.ProxyHandler: DEBUG
org.springframework.web.reactive.function.client: DEBUG
reactor.ipc.netty.channel: DEBUG
Run Code Online (Sandbox Code Playgroud)

  • 我刚刚添加了这个。它甚至没有显示一条日志 (2认同)

Tar*_*rek 5

从 Spring Boot 2.2.x 开始,Spring Webflux 支持Kotlin 协程。使用协程,您可以享受非阻塞调用的优势,而无需处理 Mono 和 Flux 包装的对象。它添加了ServerRequestServerResponse的扩展,添加了ServerRequest#awaitBody()和等方法ServerResponse.BodyBuilder.bodyValueAndAwait(body: Any)。所以你可以像这样重写你的代码:

@Bean
fun apiRouter() = coRouter {
    (accept(MediaType.APPLICATION_JSON) and "/api").nest {
        "/user".nest {
            /* the handler methods now use ServerRequest and ServerResponse directly
             you just need to add suspend before your function declaration:
             suspend fun listUsers(ServerRequest req, ServerResponse res) */ 
            GET("/", userHandler::listUsers)
            POST("/{userId}", userHandler::updateUser)
        }
    }

    // this filter will be applied to all routes built by this coRouter
    filter { request, next ->
      // using non-blocking request.awayBody<T>()
      logger.info("Processing $request with body ${request.awaitBody<String>()}")
        val res = next(request)
        logger.info("Handling with Content-Type ${res.headers().contentType} and status code ${res.rawStatusCode()}")
        res 
    }
}
Run Code Online (Sandbox Code Playgroud)

为了用coRoutines创建一个WebFilter Bean,我想你可以使用这个CoroutineWebFilter接口(我没有测试过,不知道它是否有效)。