为什么 poll() 中的这个 Delay future 在我的自定义 Stream 类型中不起作用?

Oui*_*uim 2 rust rust-tokio

我想每秒打印一次“你好”。

引用文档:

期货使用基于投票的模型。未来的消费者反复调用 poll 函数。未来然后试图完成。如果未来能够完成,则返回 Async::Ready(value)。如果未来由于被内部资源(例如 TCP 套接字)阻塞而无法完成,则返回 Async::NotReady。

如果s return is ,我的poll函数返回,但没有任何内容打印到标准输出。NotReadyDelayNotReady

use futures::{Async, Future, Stream}; // 0.1.25
use std::time::{Duration, Instant};
use tokio::timer::Delay; // 0.1.15

struct SomeStream;

impl Stream for SomeStream {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
        let when = Instant::now() + Duration::from_millis(1000);
        let mut task = Delay::new(when).map_err(|e| eprintln!("{:?}", e));
        match task.poll() {
            Ok(Async::Ready(value)) => {}
            Ok(Async::NotReady) => return Ok(Async::NotReady),
            Err(err) => return Err(()),
        }
        Ok(Async::Ready(Some("Hello".to_string())))
    }
}

fn main() {
    let s = SomeStream;
    let future = s
        .for_each(|item| {
            println!("{:?}", item);
            Ok(())
        })
        .map_err(|e| {});
    tokio::run(future);
}
Run Code Online (Sandbox Code Playgroud)

E_n*_*ate 5

这里的主要问题是缺少状态管理。Delay每次轮询流时,您都在创建一个新的未来,而不是一直持有它直到它得到解决。这将导致永远看不到流中的任何项目,因为这些期货只被轮询一次,NotReady每次都可能产生。

您需要跟踪您的类型中的延迟未来SomeStream。在这种情况下,可以使用一个选项,以便也确定我们是否需要创建新的延迟。

#[derive(Debug, Default)]
struct SomeStream {
    delay: Option<Delay>,
}
Run Code Online (Sandbox Code Playgroud)

SomeStream::poll具有更好的错误处理和更惯用的结构的 的后续代码将变成这样:

impl Stream for SomeStream {
    type Item = String;
    type Error = Box<dyn std::error::Error + Send + Sync>; // generic error

    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
        let delay = self.delay.get_or_insert_with(|| {
            let when = Instant::now() + Duration::from_millis(1000);
            Delay::new(when)
        });

        match delay.poll() {
            Ok(Async::Ready(value)) => {
                self.delay = None;
                Ok(Async::Ready(Some("Hello".to_string())))
            },
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(err) => Err(err.into()),
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

或者,更好的是,使用try_ready!宏,它可以NotReady用更少的样板来返回错误和信号。

fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
    let delay = self.delay.get_or_insert_with(|| {
        let when = Instant::now() + Duration::from_millis(1000);
        Delay::new(when)
    });

    try_ready!(delay.poll());

    // tick!
    self.delay = None;
    Ok(Async::Ready(Some("Hello".to_string())))
}
Run Code Online (Sandbox Code Playgroud)

游乐场