如何解压缩Flux <DataBuffer>(以及如何写一个)?

Abh*_*kar 5 java spring spring-integration netty spring-webflux

我需要在没有中间存储的情况下读写压缩(GZIP)流。当前,我正在使用Spring RestTemplate进行编写,而使用Apache HTTP客户端来进行阅读(请参阅此处的答案,以解释为何RestTemplate不能用于读取大数据流)。实现是相当简单的,我GZIPInputStream在响应上打了一下InputStream然后继续。

现在,我想切换到使用Spring 5 WebClient(只是因为我不喜欢现状)。但是,WebClient本质上是反应性的,并且要处理Flux<Stuff>;我相信有可能获得Flux<DataBuffer>,其中DataBuffer是的抽象ByteBuffer。问题是,如何在不将整个流存储到内存(OutOfMemoryError,我在看着你)或写入本地磁盘的情况下即时对其进行解压缩?值得一提的是WebClient在后台使用Netty。

我承认我不太了解(减压),但是我做了研究,但是网上提供的资料似乎都没有什么帮助。

Java Nio直接缓冲区上的压缩

用nio编写GZIP文件

从FileChannel(Java NIO)读取GZIP文件

使用NIO(解压缩)文件

Java中可迭代的gzip压缩/膨胀

Abh*_*kar 4

public class HttpResponseHeadersHandler extends ChannelInboundHandlerAdapter {
    private final HttpHeaders httpHeaders;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse &&
                !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();

            httpHeaders.forEach(e -> {
                log.warn("Modifying {} from: {} to: {}.", e.getKey(), headers.get(e.getKey()), e.getValue());
                headers.set(e.getKey(), e.getValue());
            });
        }
        ctx.fireChannelRead(msg);
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我创建一个与添加处理程序ClientHttpConnector一起使用WebClientafterNettyContextInit添加的处理程序:

ctx.addHandlerLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS));
ctx.addHandlerLast(new Slf4JLoggingHandler());
if (forceDecompression) {
    io.netty.handler.codec.http.HttpHeaders httpHeaders = new ReadOnlyHttpHeaders(
            true,
            CONTENT_ENCODING, GZIP,
            CONTENT_TYPE, APPLICATION_JSON
    );
    HttpResponseHeadersHandler headersModifier = new HttpResponseHeadersHandler(httpHeaders);
    ctx.addHandlerFirst(headersModifier);
}
ctx.addHandlerLast(new HttpContentDecompressor());
Run Code Online (Sandbox Code Playgroud)

当然,对于未经过 GZIP 压缩的响应,这会失败,因此我WebClient仅将这个实例用于特定用例,在该用例中我确信响应已被压缩。

编写很简单:Spring 有一个ResourceEncoder, 所以InputStream可以简单地转换为InputStreamResource, 瞧!

  • 创建 Spring `WebClient` 时,在哪里可以找到 `afterNettyContextInit` 方法? (3认同)