She*_*ter 5 asynchronous future stream rust
我有一个函数可以生成一个 futures::Stream基于参数的函数。我想多次调用此函数并将流压平在一起。使问题复杂化的事实是,我想将流返回的值作为参数反馈给原始函数。
具体来说,我有一个函数可以将数字流返回为零:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
Run Code Online (Sandbox Code Playgroud)
我想从 5 开始调用这个函数。还应该为返回的每个奇数调用该函数。总调用集numbers_down_to_zero将是:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
Run Code Online (Sandbox Code Playgroud)
产生总流
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
Run Code Online (Sandbox Code Playgroud)
有哪些技术可以实现这一点?
你可以用 来解决这个问题unfold。您将有一个“状态”结构,它保留“基本流”(在本例中倒数为零)和将生成新流的项目列表,并将其用作保留状态的unfold参数展开的同时。
这样编译器就不必推理生命周期所有权,因为async每次调用闭包时状态都可以移动到块中。
/// Base stream (counting down to zero).\nfn f(n: i32) -> impl Stream<Item = i32> {\n stream::iter((0..n).rev())\n}\n\n/// "Recursive" stream\nfn g(n: i32) -> impl Stream<Item = i32> {\n /// Helper struct to keep state while unfolding\n struct StreamState<S> {\n inner_stream: S,\n item_queue: VecDeque<i32>,\n }\n\n // Build helper struct\n let state = StreamState {\n inner_stream: f(n),\n item_queue: VecDeque::new(),\n };\n\n // Unfold with state\n stream::unfold(state, |mut state| async move {\n loop {\n if let Some(item) = state.inner_stream.next().await {\n // Iterate inner stream, and potentially push item to queue\n if item % 2 == 1 {\n state.item_queue.push_front(item);\n }\n break Some((item, state));\n } else if let Some(item) = state.item_queue.pop_back() {\n // If inner stream is exhausted, produce new stream from queue\n // and repeat loop\n state.inner_stream = f(item);\n } else {\n // If queue is empty, we are done\n break None;\n }\n }\n })\n}\nRun Code Online (Sandbox Code Playgroud)\n\n\n\nStreamExt::next要求内部流实现Unpin,因此 it\xe2\x80\x99 不能用于任意流。您始终可以使用Box::pin(stream)代替,因为Pin<Box<T>>isUnpin并实现了Streamif T: Stream。
通过(ab)使用 async/await,该genawaiter板条箱成功地模仿了当今稳定的 Rust 中的生成器语法。结合futures::pin_mut堆栈上的 pin 值,这是一个免分配且与任意流兼容的解决方案:
//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
pin_mut,
stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;
async fn g(n: i32, co: Co<'_, i32>) {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
}
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
#[tokio::main]
async fn main() {
generator_mut!(stream, |co| g(5, co));
stream
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
Run Code Online (Sandbox Code Playgroud)
一些缺点:
generator_mut宏内部有一个不安全的调用通过一次堆分配,genawaiter::rc::Gen可以摆脱所有这些。但同样,随着分配的进行,还有其他选择。
use futures::{
pin_mut,
stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;
fn g(n: i32) -> impl Stream<Item = i32> {
Gen::new(|co| async move {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
})
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1680 次 |
| 最近记录: |