如何创建一个流,其中项目基于流之前返回的项目?

She*_*ter 5 asynchronous future stream rust

我有一个函数可以生成一个 futures::Stream基于参数的函数。我想多次调用此函数并将流压平在一起。使问题复杂化的事实是,我想将流返回的值作为参数反馈给原始函数。

具体来说,我有一个函数可以将数字流返回为零:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}
Run Code Online (Sandbox Code Playgroud)

我想从 5 开始调用这个函数。还应该为返回的每个奇数调用该函数。总调用集numbers_down_to_zero将是:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}
Run Code Online (Sandbox Code Playgroud)

产生总流

numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
Run Code Online (Sandbox Code Playgroud)

有哪些技术可以实现这一点?

Frx*_*rem 5

你可以用 来解决这个问题unfold。您将有一个“状态”结构,它保留“基本流”(在本例中倒数为零)和将生成新流的项目列表,并将其用作保留状态的unfold参数展开的同时。

\n\n

这样编译器就不必推理生命周期所有权,因为async每次调用闭包时状态都可以移动到块中。

\n\n
/// Base stream (counting down to zero).\nfn f(n: i32) -> impl Stream<Item = i32> {\n    stream::iter((0..n).rev())\n}\n\n/// "Recursive" stream\nfn g(n: i32) -> impl Stream<Item = i32> {\n    /// Helper struct to keep state while unfolding\n    struct StreamState<S> {\n        inner_stream: S,\n        item_queue: VecDeque<i32>,\n    }\n\n    // Build helper struct\n    let state = StreamState {\n        inner_stream: f(n),\n        item_queue: VecDeque::new(),\n    };\n\n    // Unfold with state\n    stream::unfold(state, |mut state| async move {\n        loop {\n            if let Some(item) = state.inner_stream.next().await {\n                // Iterate inner stream, and potentially push item to queue\n                if item % 2 == 1 {\n                    state.item_queue.push_front(item);\n                }\n                break Some((item, state));\n            } else if let Some(item) = state.item_queue.pop_back() {\n                // If inner stream is exhausted, produce new stream from queue\n                // and repeat loop\n                state.inner_stream = f(item);\n            } else {\n                // If queue is empty, we are done\n                break None;\n            }\n        }\n    })\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

完整的游乐场示例

\n\n

StreamExt::next要求内部流实现Unpin,因此 it\xe2\x80\x99 不能用于任意流。您始终可以使用Box::pin(stream)代替,因为Pin<Box<T>>isUnpin并实现了Streamif T: Stream

\n


edw*_*rdw 3

通过(ab)使用 async/await,该genawaiter板条箱成功地模仿了当今稳定的 Rust 中的生成器语法。结合futures::pin_mut堆栈上的 pin 值,这是一个免分配且与任意流兼容的解决方案:

//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
    pin_mut,
    stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;

async fn g(n: i32, co: Co<'_, i32>) {
    let mut seeds = VecDeque::from(vec![n]);
    while let Some(seed) = seeds.pop_back() {
        let stream = f(seed);
        pin_mut!(stream);
        while let Some(x) = stream.next().await {
            if x % 2 != 0 {
                seeds.push_front(x);
            }
            co.yield_(x).await;
        }
    }
}

fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

#[tokio::main]
async fn main() {
    generator_mut!(stream, |co| g(5, co));
    stream
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}
Run Code Online (Sandbox Code Playgroud)

一些缺点:

  • generator_mut宏内部有一个不安全的调用
  • 接口有点漏。调用者可以看到一些实现细节。

通过一次堆分配,genawaiter::rc::Gen可以摆脱所有这些。但同样,随着分配的进行,还有其他选择。

use futures::{
    pin_mut,
    stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;

fn g(n: i32) -> impl Stream<Item = i32> {
    Gen::new(|co| async move {
        let mut seeds = VecDeque::from(vec![n]);
        while let Some(seed) = seeds.pop_back() {
            let stream = f(seed);
            pin_mut!(stream);
            while let Some(x) = stream.next().await {
                if x % 2 != 0 {
                    seeds.push_front(x);
                }
                co.yield_(x).await;
            }
        }
    })
}
Run Code Online (Sandbox Code Playgroud)