tso*_*orn 3 rust parquet apache-arrow
我知道如何将 Parquet 文件读入Vec<Row>.
extern crate parquet;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs, sync::Arc};
use parquet::column::writer::ColumnWriter;
use parquet::{
file::{
properties::WriterProperties,
writer::{FileWriter, SerializedFileWriter},
},
schema::parser::parse_message_type,
schema::types::TypePtr
};
use parquet::record::Row;
use parquet::record::RowAccessor;
use std::fs::File;
use std::io::prelude::*;
use std::path::Path;
use std::path::PathBuf;
fn read_parquet(in_path: &Path) -> (Vec<Row>, TypePtr) {
// Read Parquet input file. Return a vector of rows and the Schema
let file = File::open(in_path).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let row_iter = reader.get_row_iter(None).unwrap();
let num_rows = reader.metadata().file_metadata().num_rows();
let rows: Vec<Row> = row_iter.collect();
println!("num rows: {}", num_rows);
let schema = reader.metadata().file_metadata().schema_descr().root_schema_ptr();
(rows, schema)
}
Run Code Online (Sandbox Code Playgroud)
现在,如何写出相同的数据?我正在使用镶木地板板条箱。
fn to_parquet(data: Vec<Row>, schema: TypePtr, out_path: &Path) {
let props = Arc::new(WriterProperties::builder().build());
let file = fs::File::create(&out_path).unwrap();
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
// Now what?
}
Run Code Online (Sandbox Code Playgroud)
这是一个有效的简单示例:
use std::{fs, path::Path, sync::Arc};
use parquet::{column::writer::ColumnWriter, data_type::ByteArray, file::{
properties::WriterProperties,
writer::{FileWriter, SerializedFileWriter},
}, schema::parser::parse_message_type};
#[test]
fn sample_test() {
let path = Path::new("./sample.parquet");
let message_type = "
message schema {
REQUIRED INT32 b;
REQUIRED BINARY msg (UTF8);
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let props = Arc::new(WriterProperties::builder().build());
let file = fs::File::create(&path).unwrap();
let mut rows: i64 = 0;
let data = vec![
(10, "A"),
(20, "B"),
(30, "C"),
(40, "D"),
];
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
for (key, value) in data {
let mut row_group_writer = writer.next_row_group().unwrap();
let id_writer = row_group_writer.next_column().unwrap();
if let Some(mut writer) = id_writer {
match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
let values = vec![key];
rows +=
typed.write_batch(&values[..], None, None).unwrap() as i64;
},
_ => {
unimplemented!();
}
}
row_group_writer.close_column(writer).unwrap();
}
let data_writer = row_group_writer.next_column().unwrap();
if let Some(mut writer) = data_writer {
match writer {
ColumnWriter::ByteArrayColumnWriter(ref mut typed) => {
let values = ByteArray::from(value);
rows += typed.write_batch(&[values], None, None).unwrap() as i64;
}
_ => {
unimplemented!();
}
}
row_group_writer.close_column(writer).unwrap();
}
writer.close_row_group(row_group_writer).unwrap();
}
writer.close().unwrap();
println!("Wrote {}", rows);
let bytes = fs::read(&path).unwrap();
assert_eq!(&bytes[0..4], &[b'P', b'A', b'R', b'1']);
}
Run Code Online (Sandbox Code Playgroud)
关键是使用RowGroupWriter通过调用获得的writer.next_row_group().
sample.parquet创建文件后,您可以通过运行以下命令检查其内容:
$ parquet-read ./sample.parquet
{b: 10, msg: "A"}
{b: 20, msg: "B"}
{b: 30, msg: "C"}
{b: 40, msg: "D"}
Run Code Online (Sandbox Code Playgroud)
检查arrow-rs README.md有关二进制文件的详细信息parquet-read,但简而言之,您可以按如下方式获取它:
$ git clone https://github.com/apache/arrow-rs
$ cd arrow-rs/parquet
$ cargo install --path . --features cli
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3252 次 |
| 最近记录: |