如何使用原始InputStream实例化FSDataInputStream?

Aub*_*ine 8 apache spring hadoop

我真的不明白如何创建这样的inputStream,它是Seekable和PositionedReadable ......

Resource resource = new ClassPathResource("somefile");
InputStream bla = resource.getInputStream();
FSDataInputStream inputStream = new FSDataInputStream (bla);
Run Code Online (Sandbox Code Playgroud)

投掷FS线:

java.lang.IllegalArgumentException: In is not an instance of Seekable or PositionedReadable
Run Code Online (Sandbox Code Playgroud)

我需要做嘲笑,这对我来说是一个阻碍者.

Vin*_*kal 5

FSDataInputStream如下所示,在FSDataInputStream.java 中定义的构造函数期望InputStream参数为instanceofSeekablePositionedReadable

public FSDataInputStream(InputStream in) throws IOException 
 {
    super(in);
    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
      throw new IllegalArgumentException(
          "In is not an instance of Seekable or PositionedReadable");
    }
 }
Run Code Online (Sandbox Code Playgroud)

希望以下solution对你有帮助。

import java.io.*;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

public class SeekableTest {

    public static void main(String[] args) throws IOException
    {
        Resource resource = new ClassPathResource("somefile");
        InputStream in = resource.getInputStream();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        byte buf[] = new byte[1024];
        int read;

        while ((read = in.read(buf)) > 0)
          baos.write(buf, 0, read);

        byte data[] = baos.toByteArray();
        SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
        FSDataInputStream in2 = new FSDataInputStream(bais);
    }

    static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {

        public SeekableByteArrayInputStream(byte[] buf)
        {
            super(buf);
        }
        @Override
        public long getPos() throws IOException{
            return pos;
        }

        @Override
        public void seek(long pos) throws IOException {
          if (mark != 0)
            throw new IllegalStateException();

          reset();
          long skipped = skip(pos);

          if (skipped != pos)
            throw new IOException();
        }

        @Override
        public boolean seekToNewSource(long targetPos) throws IOException {
          return false;
        }

        @Override
        public int read(long position, byte[] buffer, int offset, int length) throws IOException {

          if (position >= buf.length)
            throw new IllegalArgumentException();
          if (position + length > buf.length)
            throw new IllegalArgumentException();
          if (length > buffer.length)
            throw new IllegalArgumentException();

          System.arraycopy(buf, (int) position, buffer, offset, length);
          return length;
        }

        @Override
        public void readFully(long position, byte[] buffer) throws IOException {
          read(position, buffer, 0, buffer.length);

        }

        @Override
        public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
          read(position, buffer, offset, length);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

参考:积累