Arrow + Java:填充 VectorSchemaRoot(来自流/文件)| 内存所有权| 使用模式

sas*_*cha 6 java apache-arrow

我正在使用Apache Arrow进行非常基本的实验,主要是关于使用 Arrow 的 IPC 格式(到文件)、Parquet 格式(到文件)和 IPC 格式(通过 JNI 流)在 Java、C++、Python 之间传递一些数据。

C++ 和 Python 看起来有些用处,但 Java 部分确实让我感到困扰。

可悲的是,Java 文档有点有限,但尽管承认这些隐藏文档(不是 TOC 的一部分)的警告,我只是想VectorSchemaRoot从以前编写的文件中填充一些。

忽略 99% 的实验代码,以下最小示例显示了我的问题,我只是创建一些数据,将其写出(可以很好地在 Python 中导入)并尝试用 Java 读回。

但是数据所有权似乎妨碍了我想要实现的目标。

也许保留VectorSchemaRoot作为核心条目的想法(例如:我的所有数据都在这里)是某种错误的用法,但我不确定有什么替代方法。为什么我要手动保留IntVectors和合作。当这个类会做同样的事情并提供一些 API 来使用它时。

package arrow.test;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

import com.google.common.collect.ImmutableList;

public class Run {

  public static void main(String[] args) {

    ImmutableList.Builder<Field> builder = ImmutableList.builder();
    
    Field intField = new Field("i", FieldType.nullable(new ArrowType.Int(32, true)), null);
    builder.add(intField);      
    
    Field boolField = new Field("b", FieldType.nullable(new ArrowType.Bool()), null);
    builder.add(boolField);     
    
    RootAllocator sourceRootAlloc = new RootAllocator(Integer.MAX_VALUE);
    Schema sourceSchema = new Schema(builder.build(), null);
    VectorSchemaRoot sourceRoot  = VectorSchemaRoot.create(sourceSchema, sourceRootAlloc);
    
    FieldVector vector= sourceRoot.getVector("i");
    IntVector intVector = (IntVector) vector;
    intVector.allocateNew(5);
    intVector.set(0, 0);
    intVector.set(1, 1);
    intVector.set(2, 2);
    intVector.set(3, 3);
    intVector.set(4, 4);
    intVector.setValueCount(5);
    
    vector = sourceRoot.getVector("b");
    BitVector bitVector = (BitVector) vector;
    bitVector.allocateNew(5);
    bitVector.set(0, 1);
    bitVector.set(1, 1);
    bitVector.set(2, 0);
    bitVector.set(3, 0);
    bitVector.set(4, 1);
    bitVector.setValueCount(5);
    
    sourceRoot.setRowCount(5);

    System.out.println("before writing");
    System.out.println(sourceRoot.contentToTSVString());        
    
    // WRITE
    // -----
    
    try {
      FileOutputStream fileOut = new FileOutputStream("out", /*!overwrite=append*/false);
        ArrowFileWriter writer = new ArrowFileWriter(sourceRoot, null, fileOut.getChannel());
      writer.start();
      writer.writeBatch();
          writer.end();
          writer.close();
          fileOut.close();  
    }
    catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    
    // READ
    // ----
    
    FileInputStream fileInputStream;
      
    RootAllocator targetRootAlloc = new RootAllocator(Integer.MAX_VALUE);
    VectorSchemaRoot targetRoot = null;
    
    try {
      fileInputStream = new FileInputStream("out");

      ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), targetRootAlloc);
      
      targetRoot = reader.getVectorSchemaRoot();    
      reader.loadNextBatch();       
      
      System.out.println("before closing stream");
      System.out.println(targetRoot.contentToTSVString());

      reader.close();
      fileInputStream.close();          

      System.out.println("after closing stream");
      System.out.println(targetRoot.contentToTSVString());      
    }
    catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }           
  }
}
Run Code Online (Sandbox Code Playgroud)

运行它(使用 Java 11 和-Dio.netty.tryReflectionSetAccessible=true文档)会导致:

... irrelevant warning
...
before writing
i   b
0   true
1   true
2   false
3   false
4   true

before closing stream
i   b
0   true
1   true
2   false
3   false
4   true

after closing stream
Exception in thread "main" java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
  at io.netty.buffer.ArrowBuf.checkIndexD(ArrowBuf.java:337)
  at io.netty.buffer.ArrowBuf.chk(ArrowBuf.java:324)
  at io.netty.buffer.ArrowBuf.getByte(ArrowBuf.java:526)
  at org.apache.arrow.vector.BaseFixedWidthVector.isSet(BaseFixedWidthVector.java:776)
  at org.apache.arrow.vector.IntVector.getObject(IntVector.java:143)
  at org.apache.arrow.vector.IntVector.getObject(IntVector.java:39)
  at org.apache.arrow.vector.VectorSchemaRoot.contentToTSVString(VectorSchemaRoot.java:268)
  at arrow.test.Run.main(Run.java:102)
Run Code Online (Sandbox Code Playgroud)

而 Python 可以轻松做到:

import pyarrow as pa
print(pa.__version__)

buf = pa.ipc.open_file("out")
print(buf.schema)

df = buf.read_pandas()
print(df)
Run Code Online (Sandbox Code Playgroud)

输出:

0.17.1
i: int32
b: bool
   i      b
0  0   True
1  1   True
2  2  False
3  3  False
4  4   True
Run Code Online (Sandbox Code Playgroud)

现在看来ArrowFileReader 尽管分配器是在其范围之外定义的,但仍然有责任清理数据targetRoot.getRowCount()是正确的,但每个VectorField的大小为0

我尝试了很多替代方案(未显示)关于使用VectorUnloaderVectorLoader从某些读取器 VectorSchemaRoot(本地范围)batch.cloneWithTransfer(targetAlloc)加载,使用(全局范围)转移所有权并加载到目标 VectorSchemaRoot(全局范围),但这通常不起作用由于A buffer can only be associated between two allocators that share the same root

要么我对使用模式有很多误解,要么(可能不是,但感觉就像)Java 部分已经很糟糕了。

有任何想法吗?

Cne*_*ork 0

今天遇到同样的问题,解决了,出现这个问题是因为vectorSchemaRoot无法写入值,可能是直接内存无效或者schema错误,如果你有微信我们可以聊聊,微信id:DawnWang