我无法弄清楚如何解决因使用组合器调整流而导致的编译错误。
下面的Rust Playground展示了一个相当小的例子:
use futures::prelude::*;
use futures::StreamExt;
#[derive(Debug)]
pub enum Input {
A,
B(i32),
C(u16),
}
#[derive(Debug)]
enum Output {
Int(i32),
Short(u16),
}
pub struct StreamMaker;
impl StreamMaker {
/// make a stream with a series of inputs
pub fn create(self) -> impl Stream<Item = Input> {
stream::iter(vec![Input::A, Input::C(1u16), Input::B(2)])
}
}
/// consume the creator, and make output messages for a subset
pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> {
let mut upstream = creator.create();
upstream.filter_map(|message| async move {
match message {
Input::A => None,
Input::B(v) => Some(Output::Int(v)),
Input::C(v) => Some(Output::Short(v)),
}
})
}
#[tokio::main]
async fn main() -> Result<(), ()> {
let creator = StreamMaker {};
let mut stream = adapt_stream(creator);
while let Some(message) = stream.next().await {
println!("message: {:?}", message)
}
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
编译失败:
use futures::prelude::*;
use futures::StreamExt;
#[derive(Debug)]
pub enum Input {
A,
B(i32),
C(u16),
}
#[derive(Debug)]
enum Output {
Int(i32),
Short(u16),
}
pub struct StreamMaker;
impl StreamMaker {
/// make a stream with a series of inputs
pub fn create(self) -> impl Stream<Item = Input> {
stream::iter(vec![Input::A, Input::C(1u16), Input::B(2)])
}
}
/// consume the creator, and make output messages for a subset
pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> {
let mut upstream = creator.create();
upstream.filter_map(|message| async move {
match message {
Input::A => None,
Input::B(v) => Some(Output::Int(v)),
Input::C(v) => Some(Output::Short(v)),
}
})
}
#[tokio::main]
async fn main() -> Result<(), ()> {
let creator = StreamMaker {};
let mut stream = adapt_stream(creator);
while let Some(message) = stream.next().await {
println!("message: {:?}", message)
}
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
我可以pin_mut!(stream);在 main 中放入 a ,但我希望能够将其推到上游。
如果您不希望您的流的使用者必须自己固定它,您需要返回一个实现该Unpin特征的流,这意味着即使在固定之后也可以安全地在内存中移动。
pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> + Unpin {
// add Unpin trait --^
Run Code Online (Sandbox Code Playgroud)
添加这个,你的编译器应该抱怨返回值没有实现Unpin。这是因为async move { ... }块没有实现Unpin,因为它们可能是自引用的(例如包含对它们拥有的变量的引用)。解决此问题的最通用方法是Pin<Box<_>>使用Box::pin构造函数将流固定到堆上:
pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> + Unpin {
let mut upstream = creator.create();
Box::pin(upstream.filter_map(|message| async move {
// ^-- pin stream to heap
match message {
Input::A => None,
Input::B(v) => Some(Output::Int(v)),
Input::C(v) => Some(Output::Short(v)),
}
}))
}
Run Code Online (Sandbox Code Playgroud)
由于我们现在返回一个Pin<Box<_>>指向流的指针,该指针可以安全地在内存中移动,而内部流则保持在同一位置。
| 归档时间: |
|
| 查看次数: |
857 次 |
| 最近记录: |