为什么在返回`NotReady`后没有重复调用`Future :: poll`?

Mar*_*aTh 5 multithreading future rust

请考虑以下代码

extern crate futures;

use std::sync::{atomic, Arc};
use futures::*;

struct F(Arc<atomic::AtomicBool>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        if self.0.load(atomic::Ordering::Relaxed) {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let flag = Arc::new(atomic::AtomicBool::new(false));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        flag.store(true, atomic::Ordering::Relaxed);
    });
    // ::std::thread::sleep_ms(20);
    let result = future.wait();
    println!("result: {:?}", result);
}
Run Code Online (Sandbox Code Playgroud)

生成的线程设置了一个未来等待的标志.我们也睡眠产生的线程,所以初始.poll()调用.wait()是在标志设置之前.这导致.wait()无限地阻止(看似).如果我们取消注释另一个thread::sleep_ms,则.wait()返回并打印出result(()).

我希望当前的线程尝试通过poll多次调用来解决未来,因为我们阻止了当前的线程.但是,这种情况并没有发生.

我试图阅读一些文档,似乎问题是该线程是从第一次park获取后编辑的.但是,我不清楚为什么会这样,或者如何解决这个问题.NotReadypoll

我错过了什么?

Art*_*mGr 7

为什么你需要停下等待的未来,而不是反复轮询?答案很明显,恕我直言.因为在一天结束时它更快更有效!

为了反复轮询未来(可能被称为" 忙碌等待 "),图书馆必须决定是经常做还是很少做,而且回答都不令人满意.经常这样做,你浪费CPU周期,很少做,代码反应迟钝.

所以是的,你需要在等待某事时停放任务,然后在等待时取消停放.像这样:

#![allow(deprecated)]

extern crate futures;

use std::sync::{Arc, Mutex};
use futures::*;
use futures::task::{park, Task};

struct Status {
    ready: bool,
    task: Option<Task>,
}

#[allow(dead_code)]
struct F(Arc<Mutex<Status>>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        let mut status = self.0.lock().expect("!lock");
        if status.ready {
            Ok(Async::Ready(()))
        } else {
            status.task = Some(park());
            Ok(Async::NotReady)
        }
    }
}

#[test]
fn test() {
    let flag = Arc::new(Mutex::new(Status {
                                       ready: false,
                                       task: None,
                                   }));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        let mut status = flag.lock().expect("!lock");
        status.ready = true;
        if let Some(ref task) = status.task {
            task.unpark()
        }
    });
    let result = future.wait();
    println!("result: {:?}", result);
}
Run Code Online (Sandbox Code Playgroud)

请注意,Future::poll这里做了几件事:它正在检查外部条件并且它正在停止任务,因此可以进行比赛,例如:

  1. poll检查变量,并发现它是false;
  2. 外部代码将变量设置为true;
  3. 外部代码检查任务是否停放并发现它不是;
  4. poll公园的任务,但热潮!现在为时已晚,没有人会再停车了.

为了避免任何比赛,我使用了a Mutex来同步这些交互.

PS如果您只需要将一个线程结果包装成a Future然后考虑使用该oneshot通道:它已经Receiver实现了该Future接口.

  • *答案很明显,恕我直言* - 如果答案很明显,人们不需要问这个问题^ _ ^ (4认同)