如何解决因使用组合器调整流而导致的编译错误“未为 GenFuture 实现 Unpin”?

arb*_*rbe 2 rust

我无法弄清楚如何解决因使用组合器调整流而导致的编译错误。

下面的Rust Playground展示了一个相当小的例子:

use futures::prelude::*;
use futures::StreamExt;

#[derive(Debug)]
pub enum Input {
    A,
    B(i32),
    C(u16),
}

#[derive(Debug)]
enum Output {
    Int(i32),
    Short(u16),
}

pub struct StreamMaker;

impl StreamMaker {
    /// make a stream with a series of inputs
    pub fn create(self) -> impl Stream<Item = Input> {
        stream::iter(vec![Input::A, Input::C(1u16), Input::B(2)])
    }
}

/// consume the creator, and make output messages for a subset
pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> {
    let mut upstream = creator.create();
    upstream.filter_map(|message| async move {
        match message {
            Input::A => None,
            Input::B(v) => Some(Output::Int(v)),
            Input::C(v) => Some(Output::Short(v)),
        }
    })
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    let creator = StreamMaker {};
    let mut stream = adapt_stream(creator);

    while let Some(message) = stream.next().await {
        println!("message: {:?}", message)
    }

    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

编译失败:

use futures::prelude::*;
use futures::StreamExt;

#[derive(Debug)]
pub enum Input {
    A,
    B(i32),
    C(u16),
}

#[derive(Debug)]
enum Output {
    Int(i32),
    Short(u16),
}

pub struct StreamMaker;

impl StreamMaker {
    /// make a stream with a series of inputs
    pub fn create(self) -> impl Stream<Item = Input> {
        stream::iter(vec![Input::A, Input::C(1u16), Input::B(2)])
    }
}

/// consume the creator, and make output messages for a subset
pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> {
    let mut upstream = creator.create();
    upstream.filter_map(|message| async move {
        match message {
            Input::A => None,
            Input::B(v) => Some(Output::Int(v)),
            Input::C(v) => Some(Output::Short(v)),
        }
    })
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    let creator = StreamMaker {};
    let mut stream = adapt_stream(creator);

    while let Some(message) = stream.next().await {
        println!("message: {:?}", message)
    }

    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

我可以pin_mut!(stream);在 main 中放入 a ,但我希望能够将其推到上游。

Frx*_*rem 7

如果您不希望您的流的使用者必须自己固定它,您需要返回一个实现该Unpin特征的流,这意味着即使在固定之后也可以安全地在内存中移动。

pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> + Unpin {
//                                                      add Unpin trait --^
Run Code Online (Sandbox Code Playgroud)

添加这个,你的编译器应该抱怨返回值没有实现Unpin。这是因为async move { ... }块没有实现Unpin,因为它们可能是自引用的(例如包含对它们拥有的变量的引用)。解决此问题的最通用方法是Pin<Box<_>>使用Box::pin构造函数将流固定到堆上:

pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> + Unpin {
    let mut upstream = creator.create();
    Box::pin(upstream.filter_map(|message| async move {
//  ^-- pin stream to heap
        match message {
            Input::A => None,
            Input::B(v) => Some(Output::Int(v)),
            Input::C(v) => Some(Output::Short(v)),
        }
    }))
}
Run Code Online (Sandbox Code Playgroud)

由于我们现在返回一个Pin<Box<_>>指向流的指针,该指针可以安全地在内存中移动,而内部流则保持在同一位置。

完整的操场示例