Java按顺序解压GZIP流

Eld*_*dad 3 java gzip zlib chunks

我的 Java 程序实现了一个服务器,它应该从客户端通过 websockets 获取一个非常大的文件,使用 gzip 压缩,并且应该检查文件内容中的一些字节模式。

客户端发送嵌入在专有协议中的文件块,因此我从客户端收到一条又一条消息,解析消息并提取 gzipped 文件内容。

我无法在程序内存中保存整个文件,所以我试图解压缩每个块,处理数据并继续下一个块。

我正在使用以下代码:

public static String gzipDecompress(byte[] compressed) throws IOException {
    String uncompressed;
    try (
        ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
        GZIPInputStream gis = new GZIPInputStream(bis);
        Reader reader = new InputStreamReader(gis);
        Writer writer = new StringWriter()
    ) {

      char[] buffer = new char[10240];
      for (int length = 0; (length = reader.read(buffer)) > 0; ) {
        writer.write(buffer, 0, length);
      }
      uncompressed = writer.toString();
    }

    return uncompressed;
  }
Run Code Online (Sandbox Code Playgroud)

但是在使用第一个压缩块调用函数时出现以下异常:

java.io.EOFException: Unexpected end of ZLIB input stream
    at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240)
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.Reader.read(Reader.java:140)
Run Code Online (Sandbox Code Playgroud)

重要的是要提到我没有跳过任何块并尝试按顺序解压缩块。

我错过了什么?

Rom*_*kiy 5

问题在于您手动使用这些块。

正确的方法是获取 some InputStream,将其包裹起来,GZIPInputStream然后读取数据。

    InputStream is = // obtain the original gzip stream

    GZIPInputStream gis = new GZIPInputStream(is);
    Reader reader = new InputStreamReader(gis);

    //... proceed reading and so on
Run Code Online (Sandbox Code Playgroud)

GZIPInputStream以流方式工作,因此如果您一次只从reader.

问题更新后更新

针对您的情况的一个可能的解决方案是编写一个InputStream实现,该实现将您的客户端协议处理程序以块的形式放入其中的字节流。

这是一个原型:

public class ProtocolDataInputStream extends InputStream {
    private BlockingQueue<byte[]> nextChunks = new ArrayBlockingQueue<byte[]>(100);
    private byte[] currentChunk = null;
    private int currentChunkOffset = 0;
    private boolean noMoreChunks = false;

    @Override
    public synchronized int read() throws IOException {
        boolean takeNextChunk = currentChunk == null || currentChunkOffset >= currentChunk.length;
        if (takeNextChunk) {
            if (noMoreChunks) {
                // stream is exhausted
                return -1;
            } else {
                currentChunk = nextChunks.take();
                currentChunkOffset = 0;
            }
        }
        return currentChunk[currentChunkOffset++];
    }

    @Override
    public synchronized int available() throws IOException {
        if (currentChunk == null) {
            return 0;
        } else {
            return currentChunk.length - currentChunkOffset;
        }
    }

    public synchronized void addChunk(byte[] chunk, boolean chunkIsLast) {
        nextChunks.add(chunk);
        if (chunkIsLast) {
            noMoreChunks = true;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

您的客户端协议处理程序使用 来添加字节块addChunk(),而您的解压缩代码则从该流中提取数据(通过Reader)。

请注意,此代码有一些问题:

  1. 正在使用的队列大小有限。如果addChunk()调用太频繁,队列可能会被填满,这会阻塞addChunk()。这可能是可取的,也可能不是。
  2. read()出于说明目的实施方法。为了性能,最好以read(byte[])相同的方式实现。
  3. 在假定读取器(解压缩器)和写入器(协议处理程序调用addChunk())是不同线程的情况下使用保守同步。
  4. InterruptedException不做处理take(),以免细节过多。

如果你解压缩,并addChunk()在同一个线程执行(在相同的循环),那么你可以尝试使用InputStream.available()牵引用时方法InputStreamReader.ready()用牵引时Reader