字符串到字节输入流的迭代器

Hei*_*erg 5 java scala java-stream

我想将字符串迭代器转换为字节输入流。通常,我可以通过在 a 中附加所有字符串StringBuilder并执行以下操作来做到这一点: InputStream is = new ByteArrayInputStream(sb.toString().getBytes());

但我想懒惰地这样做,因为我的可迭代对象是由 Spark 提供的,并且长度可能非常大。我发现这个例子在 Scala 中做到了:

  def rowsToInputStream(rows: Iterator[String], delimiter: String): InputStream = {
  val bytes: Iterator[Byte] = rows.map { row =>
    (row + "\n").getBytes
  }.flatten

  new InputStream {
    override def read(): Int = if (bytes.hasNext) {
      bytes.next & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255
    } else {
      -1
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

但是我找不到将其转换为 Java 的简单方法。我将iterator使用转换为流Spliterators.spliteratorUnknownSize,然后getBytes输出一个无法轻易展平的数组。总的来说,它变得非常混乱。

在 Java 中有一种优雅的方法可以做到这一点吗?

Hol*_*ger 5

如果你想支持InputStream快速的批量操作,你应该实现
\nint read(byte[] b, int off, int len)方法,它不仅可以被读取的代码直接调用InputStream,而且也是继承方法的后端

\n\n
    \n
  • int read(byte b[])
  • \n
  • long skip(long n)
  • \n
  • byte[] readAllBytes()(JDK\xc2\xa09)
  • \n
  • int readNBytes(byte[] b, int off, int len)(JDK\xc2\xa09)
  • \n
  • long transferTo(OutputStream out)(JDK\xc2\xa09)
  • \n
  • byte[] readNBytes(int len)(JDK\xc2\xa011)
  • \n
  • void skipNBytes\xe2\x80\x8b(long n)(JDK\xc2\xa014)
  • \n
\n\n

当该方法得到有效的实施时,它将更有效地工作。

\n\n
public class StringIteratorInputStream extends InputStream {\n    private CharsetEncoder encoder;\n    private Iterator<String> strings;\n    private CharBuffer current;\n    private ByteBuffer pending;\n\n    public StringIteratorInputStream(Iterator<String> it) {\n        this(it, Charset.defaultCharset());\n    }\n    public StringIteratorInputStream(Iterator<String> it, Charset cs) {\n        encoder = cs.newEncoder();\n        strings = Objects.requireNonNull(it);\n    }\n\n    @Override\n    public int read() throws IOException {\n        for(;;) {\n            if(pending != null && pending.hasRemaining())\n                return pending.get() & 0xff;\n            if(!ensureCurrent()) return -1;\n            if(pending == null) pending = ByteBuffer.allocate(4096);\n            else pending.compact();\n            encoder.encode(current, pending, !strings.hasNext());\n            pending.flip();\n        }\n    }\n\n    private boolean ensureCurrent() {\n        while(current == null || !current.hasRemaining()) {\n            if(!strings.hasNext()) return false;\n            current = CharBuffer.wrap(strings.next());\n        }\n        return true;\n    }\n\n    @Override\n    public int read(byte[] b, int off, int len) {\n        // Objects.checkFromIndexSize(off, len, b.length); // JDK 9\n        int transferred = 0;\n        if(pending != null && pending.hasRemaining()) {\n            boolean serveByBuffer = pending.remaining() >= len;\n            pending.get(b, off, transferred = Math.min(pending.remaining(), len));\n            if(serveByBuffer) return transferred;\n            len -= transferred;\n            off += transferred;\n        }\n        ByteBuffer bb = ByteBuffer.wrap(b, off, len);\n        while(bb.hasRemaining() && ensureCurrent()) {\n            int r = bb.remaining();\n            encoder.encode(current, bb, !strings.hasNext());\n            transferred += r - bb.remaining();\n        }\n        return transferred == 0? -1: transferred;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

A基本上是解决方案中、和变量ByteBuffer的组合。但是,只有当调用者真正使用该方法读取单个字节时,才会初始化缓冲区。否则,代码将创建一个包装调用者提供的目标缓冲区的对象,以将字符串直接编码到其中。byte buf[];int pos;int count;pendingint read()ByteBuffer

\n\n

遵循CharBuffer相同的概念,只是对于char序列。在此代码中,它始终是字符串之一的包装器,而不是具有自己存储空间的缓冲区。因此,在最好的情况下,此InputStream实现会将所有迭代器提供的字符串编码到调用者提供的缓冲区中,而无需中间存储。

\n\n

这个概念确实已经暗示了惰性处理,因为没有中间存储,只有适合调用者提供的缓冲区的内容,换句话说,将从迭代器中获取调用者请求的内容。

\n