我正在使用Apache Arrow进行非常基本的实验,主要是关于使用 Arrow 的 IPC 格式(到文件)、Parquet 格式(到文件)和 IPC 格式(通过 JNI 流)在 Java、C++、Python 之间传递一些数据。
C++ 和 Python 看起来有些用处,但 Java 部分确实让我感到困扰。
可悲的是,Java 文档有点有限,但尽管承认这些隐藏文档(不是 TOC 的一部分)的警告,我只是想VectorSchemaRoot
从以前编写的文件中填充一些。
忽略 99% 的实验代码,以下最小示例显示了我的问题,我只是创建一些数据,将其写出(可以很好地在 Python 中导入)并尝试用 Java 读回。
但是数据所有权似乎妨碍了我想要实现的目标。
也许保留VectorSchemaRoot
作为核心条目的想法(例如:我的所有数据都在这里)是某种错误的用法,但我不确定有什么替代方法。为什么我要手动保留IntVectors
和合作。当这个类会做同样的事情并提供一些 API 来使用它时。
package arrow.test;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import com.google.common.collect.ImmutableList;
public class Run {
public static void main(String[] args) {
ImmutableList.Builder<Field> builder = ImmutableList.builder();
Field intField = new Field("i", FieldType.nullable(new ArrowType.Int(32, true)), null);
builder.add(intField);
Field boolField = new Field("b", FieldType.nullable(new ArrowType.Bool()), null);
builder.add(boolField);
RootAllocator sourceRootAlloc = new RootAllocator(Integer.MAX_VALUE);
Schema sourceSchema = new Schema(builder.build(), null);
VectorSchemaRoot sourceRoot = VectorSchemaRoot.create(sourceSchema, sourceRootAlloc);
FieldVector vector= sourceRoot.getVector("i");
IntVector intVector = (IntVector) vector;
intVector.allocateNew(5);
intVector.set(0, 0);
intVector.set(1, 1);
intVector.set(2, 2);
intVector.set(3, 3);
intVector.set(4, 4);
intVector.setValueCount(5);
vector = sourceRoot.getVector("b");
BitVector bitVector = (BitVector) vector;
bitVector.allocateNew(5);
bitVector.set(0, 1);
bitVector.set(1, 1);
bitVector.set(2, 0);
bitVector.set(3, 0);
bitVector.set(4, 1);
bitVector.setValueCount(5);
sourceRoot.setRowCount(5);
System.out.println("before writing");
System.out.println(sourceRoot.contentToTSVString());
// WRITE
// -----
try {
FileOutputStream fileOut = new FileOutputStream("out", /*!overwrite=append*/false);
ArrowFileWriter writer = new ArrowFileWriter(sourceRoot, null, fileOut.getChannel());
writer.start();
writer.writeBatch();
writer.end();
writer.close();
fileOut.close();
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// READ
// ----
FileInputStream fileInputStream;
RootAllocator targetRootAlloc = new RootAllocator(Integer.MAX_VALUE);
VectorSchemaRoot targetRoot = null;
try {
fileInputStream = new FileInputStream("out");
ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), targetRootAlloc);
targetRoot = reader.getVectorSchemaRoot();
reader.loadNextBatch();
System.out.println("before closing stream");
System.out.println(targetRoot.contentToTSVString());
reader.close();
fileInputStream.close();
System.out.println("after closing stream");
System.out.println(targetRoot.contentToTSVString());
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
运行它(使用 Java 11 和-Dio.netty.tryReflectionSetAccessible=true
文档)会导致:
... irrelevant warning
...
before writing
i b
0 true
1 true
2 false
3 false
4 true
before closing stream
i b
0 true
1 true
2 false
3 false
4 true
after closing stream
Exception in thread "main" java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
at io.netty.buffer.ArrowBuf.checkIndexD(ArrowBuf.java:337)
at io.netty.buffer.ArrowBuf.chk(ArrowBuf.java:324)
at io.netty.buffer.ArrowBuf.getByte(ArrowBuf.java:526)
at org.apache.arrow.vector.BaseFixedWidthVector.isSet(BaseFixedWidthVector.java:776)
at org.apache.arrow.vector.IntVector.getObject(IntVector.java:143)
at org.apache.arrow.vector.IntVector.getObject(IntVector.java:39)
at org.apache.arrow.vector.VectorSchemaRoot.contentToTSVString(VectorSchemaRoot.java:268)
at arrow.test.Run.main(Run.java:102)
Run Code Online (Sandbox Code Playgroud)
而 Python 可以轻松做到:
import pyarrow as pa
print(pa.__version__)
buf = pa.ipc.open_file("out")
print(buf.schema)
df = buf.read_pandas()
print(df)
Run Code Online (Sandbox Code Playgroud)
输出:
0.17.1
i: int32
b: bool
i b
0 0 True
1 1 True
2 2 False
3 3 False
4 4 True
Run Code Online (Sandbox Code Playgroud)
现在看来,ArrowFileReader
尽管分配器是在其范围之外定义的,但仍然有责任清理数据。targetRoot.getRowCount()
是正确的,但每个VectorField的大小为0
。
我尝试了很多替代方案(未显示)关于使用VectorUnloader
和VectorLoader
从某些读取器 VectorSchemaRoot(本地范围)batch.cloneWithTransfer(targetAlloc)
加载,使用(全局范围)转移所有权并加载到目标 VectorSchemaRoot(全局范围),但这通常不起作用由于A buffer can only be associated between two allocators that share the same root
要么我对使用模式有很多误解,要么(可能不是,但感觉就像)Java 部分已经很糟糕了。
有任何想法吗?
今天遇到同样的问题,解决了,出现这个问题是因为vectorSchemaRoot无法写入值,可能是直接内存无效或者schema错误,如果你有微信我们可以聊聊,微信id:DawnWang
归档时间: |
|
查看次数: |
707 次 |
最近记录: |