Rust Streams:如何将 TryStream<Ok = Vec<Item>, Error = MyErrorType>> 转换为 TryStream<Ok = Item, Error = MyErrorType>

Jas*_*erV 1 asynchronous stream rust async-await

我正在使用板条箱玩 Rust 中的流futures-util

我正在创建一个流,该流在每个步骤中都从分页 API 请求新页面。每个页面的项目基本上都是一个向量。显而易见,每次调用服务器请求新页面都可能会失败。

我不想返回向量流,而是想返回一个项目流,当尝试从失败的页面获取项目时,该项目流会产生错误(并因此关闭该流)。

这是我当前的功能:

    pub fn search(
        &self,
        query: Query,
    ) -> impl TryStream<Ok = Vec<Item>, Error = MyError> {
        let initial_state = (query, None);
        stream::try_unfold(initial_state, |(query, page_token)| async move {
            let (items, next_page_token) = fetch_page(&query, page_token).await?;
            if items.len() <= 0 {
                Ok(None)
            } else {
                Ok(Some((items, (query, next_page_token))))
            }
        })
    }

Run Code Online (Sandbox Code Playgroud)

我想修改它,使返回类型为impl TryStream<Ok = Item, Error = MyError>

有可能吗?

Cha*_*man 5

您可以使用try_flatten(),但是它要求每个内部流也是 a TryStream,因此您需要映射每个项目:

use futures_util::stream::TryStreamExt;

pub fn search(query: Query) -> impl TryStream<Ok = Item, Error = MyError> {
    let initial_state = (query, None);
    stream::try_unfold(initial_state, |(query, page_token)| async move {
        let (items, next_page_token) = fetch_page(&query, page_token).await?;
        if items.len() <= 0 {
            Ok(None)
        } else {
            Ok(Some((items, (query, next_page_token))))
        }
    })
    .map_ok(|items| stream::iter(items.into_iter().map(|item| Ok(item))))
    .try_flatten()
}
Run Code Online (Sandbox Code Playgroud)

但请注意,返回impl Stream<Item = Result<Item, MyError>>比更好impl TryStream,因为impl Stream也会自动实现TryStream,但反之则不然。