如何使用 serde_json 从 JSON 数组内部流式传输元素?

R S*_*Sun 7 rust serde-json

我有一个 5GB JSON 文件,它是具有固定结构的对象数组:

[
  {
    "first": "John",
    "last": "Doe",
    "email": "john.doe@yahoo.com"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "anne.ortha@hotmail.com"
  },
  ....
]
Run Code Online (Sandbox Code Playgroud)

我知道我可以尝试使用如何使用 Serde 使用顶级数组反序列化 JSON?中所示的代码来解析此文件。:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    first: String,
    last: String,
    email: String,
}

let users: Vec<User> = serde_json::from_str(file)?;
Run Code Online (Sandbox Code Playgroud)

有多个问题:

  1. 首先将其作为一个字符串整体读取
  2. 读取为字符串后,它将其转换为结构向量User(我不想要这样)

我尝试了如何从 Rust 中的文件/流中延迟读取多个 JSON 值?但它会在打印任何内容之前读取整个文件,并在循环内立即打印整个结构。我期待循环中一次有一个对象:

在此输入图像描述

理想情况下,(已解析的)用户对象的解析和处理应该在两个单独的线程/任务/例程中同时发生,或者通过使用通道同时发生。

use*_*342 6

从 JSON 数组流式传输元素是可能的,但需要一些跑腿工作。您必须自己跳过前导[和间断,,并检测最后的]. 要解析单个数组元素,您需要使用StreamDeserializer并从中提取单个项目(这样您就可以删除它并重新获得对 IO 读取器的控制)。例如:

use serde::de::DeserializeOwned;
use serde_json::{self, Deserializer};
use std::io::{self, Read};

fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
    loop {
        let mut byte = 0u8;
        reader.read_exact(std::slice::from_mut(&mut byte))?;
        if !byte.is_ascii_whitespace() {
            return Ok(byte);
        }
    }
}

fn invalid_data(msg: &str) -> io::Error {
    io::Error::new(io::ErrorKind::InvalidData, msg)
}

fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
    let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
    match next_obj {
        Some(result) => result.map_err(Into::into),
        None => Err(invalid_data("premature EOF")),
    }
}

fn yield_next_obj<T: DeserializeOwned, R: Read>(
    mut reader: R,
    at_start: &mut bool,
) -> io::Result<Option<T>> {
    if !*at_start {
        *at_start = true;
        if read_skipping_ws(&mut reader)? == b'[' {
            // read the next char to see if the array is empty
            let peek = read_skipping_ws(&mut reader)?;
            if peek == b']' {
                Ok(None)
            } else {
                deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
            }
        } else {
            Err(invalid_data("`[` not found"))
        }
    } else {
        match read_skipping_ws(&mut reader)? {
            b',' => deserialize_single(reader).map(Some),
            b']' => Ok(None),
            _ => Err(invalid_data("`,` or `]` not found")),
        }
    }
}

pub fn iter_json_array<T: DeserializeOwned, R: Read>(
    mut reader: R,
) -> impl Iterator<Item = Result<T, io::Error>> {
    let mut at_start = false;
    std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

fn main() {
    let data = r#"[
  {
    "first": "John",
    "last": "Doe",
    "email": "john.doe@yahoo.com"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "anne.ortha@hotmail.com"
  }
]"#;
    use serde::{Deserialize, Serialize};

    #[derive(Serialize, Deserialize, Debug)]
    struct User {
        first: String,
        last: String,
        email: String,
    }

    for user in iter_json_array(io::Cursor::new(&data)) {
        let user: User = user.unwrap();
        println!("{:?}", user);
    }
}
Run Code Online (Sandbox Code Playgroud)

操场

在生产中使用它时,您可以将其打开为而File不是将其读取为字符串。一如既往,不要忘记将 包裹FileBufReader.