Hei*_*erg 5 java scala java-stream
我想将字符串迭代器转换为字节输入流。通常,我可以通过在 a 中附加所有字符串StringBuilder并执行以下操作来做到这一点:
InputStream is = new ByteArrayInputStream(sb.toString().getBytes());
但我想懒惰地这样做,因为我的可迭代对象是由 Spark 提供的,并且长度可能非常大。我发现这个例子在 Scala 中做到了:
def rowsToInputStream(rows: Iterator[String], delimiter: String): InputStream = {
val bytes: Iterator[Byte] = rows.map { row =>
(row + "\n").getBytes
}.flatten
new InputStream {
override def read(): Int = if (bytes.hasNext) {
bytes.next & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255
} else {
-1
}
}
}
Run Code Online (Sandbox Code Playgroud)
但是我找不到将其转换为 Java 的简单方法。我将iterator使用转换为流Spliterators.spliteratorUnknownSize,然后getBytes输出一个无法轻易展平的数组。总的来说,它变得非常混乱。
在 Java 中有一种优雅的方法可以做到这一点吗?
如果你想支持InputStream快速的批量操作,你应该实现
\nint read(byte[] b, int off, int len)方法,它不仅可以被读取的代码直接调用InputStream,而且也是继承方法的后端
int read(byte b[])long skip(long n)byte[] readAllBytes()(JDK\xc2\xa09)int readNBytes(byte[] b, int off, int len)(JDK\xc2\xa09)long transferTo(OutputStream out)(JDK\xc2\xa09)byte[] readNBytes(int len)(JDK\xc2\xa011)void skipNBytes\xe2\x80\x8b(long n)(JDK\xc2\xa014)当该方法得到有效的实施时,它将更有效地工作。
\n\npublic class StringIteratorInputStream extends InputStream {\n private CharsetEncoder encoder;\n private Iterator<String> strings;\n private CharBuffer current;\n private ByteBuffer pending;\n\n public StringIteratorInputStream(Iterator<String> it) {\n this(it, Charset.defaultCharset());\n }\n public StringIteratorInputStream(Iterator<String> it, Charset cs) {\n encoder = cs.newEncoder();\n strings = Objects.requireNonNull(it);\n }\n\n @Override\n public int read() throws IOException {\n for(;;) {\n if(pending != null && pending.hasRemaining())\n return pending.get() & 0xff;\n if(!ensureCurrent()) return -1;\n if(pending == null) pending = ByteBuffer.allocate(4096);\n else pending.compact();\n encoder.encode(current, pending, !strings.hasNext());\n pending.flip();\n }\n }\n\n private boolean ensureCurrent() {\n while(current == null || !current.hasRemaining()) {\n if(!strings.hasNext()) return false;\n current = CharBuffer.wrap(strings.next());\n }\n return true;\n }\n\n @Override\n public int read(byte[] b, int off, int len) {\n // Objects.checkFromIndexSize(off, len, b.length); // JDK 9\n int transferred = 0;\n if(pending != null && pending.hasRemaining()) {\n boolean serveByBuffer = pending.remaining() >= len;\n pending.get(b, off, transferred = Math.min(pending.remaining(), len));\n if(serveByBuffer) return transferred;\n len -= transferred;\n off += transferred;\n }\n ByteBuffer bb = ByteBuffer.wrap(b, off, len);\n while(bb.hasRemaining() && ensureCurrent()) {\n int r = bb.remaining();\n encoder.encode(current, bb, !strings.hasNext());\n transferred += r - bb.remaining();\n }\n return transferred == 0? -1: transferred;\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\nA基本上是解决方案中、和变量ByteBuffer的组合。但是,只有当调用者真正使用该方法读取单个字节时,才会初始化缓冲区。否则,代码将创建一个包装调用者提供的目标缓冲区的对象,以将字符串直接编码到其中。byte buf[];int pos;int count;pendingint read()ByteBuffer
遵循CharBuffer相同的概念,只是对于char序列。在此代码中,它始终是字符串之一的包装器,而不是具有自己存储空间的缓冲区。因此,在最好的情况下,此InputStream实现会将所有迭代器提供的字符串编码到调用者提供的缓冲区中,而无需中间存储。
这个概念确实已经暗示了惰性处理,因为没有中间存储,只有适合调用者提供的缓冲区的内容,换句话说,将从迭代器中获取调用者请求的内容。
\n| 归档时间: |
|
| 查看次数: |
646 次 |
| 最近记录: |