在将来-rs中封装阻塞I/O的最佳方法是什么?

Mom*_*ain 9 performance future rust

我阅读了tokio文档,我想知道将来封装昂贵的同步I/O的最佳方法是什么.

使用reactor框架,我们可以获得绿色线程模型的优势:一些OS线程通过执行程序处理大量并发任务.

未来的tokio模型是需求驱动的,这意味着未来本身将轮询其内部状态以提供有关其完成的信息; 允许背压和取消功能.据我了解,未来的投票阶段必须是非阻塞才能运作良好.

I/OI想要封装可以看作是一个长期的原子和昂贵的操作.理想情况下,独立任务将执行I/O,并且相关联的未来将轮询I/O线程以获得完成状态.

我看到的两个唯一选择是:

  • 将阻塞I/O包含poll在将来的功能中.
  • 产生OS线程以执行I/O并使用未来机制轮询其状态,如文档中所示

据我所知,这两种解决方案都不是最优的,并且没有充分利用绿色线程模型(首先不在文档中建议,其次不通过reactor框架提供的执行程序).还有其他解决方案吗?

She*_*ter 12

理想情况下,独立任务将执行I/O,并且相关联的未来将轮询I/O线程以获得完成状态.

是的,这就是Tokio推荐的内容,以及为future-cpupooltokio-threadpool创建的包装箱.请注意,这不仅限于I/O,但对于任何长时间运行的同步任务都有效!

在这种情况下,您计划在池中运行闭包.池本身执行工作以检查阻塞闭包是否已完成并满足Future特征.

use futures::{future, Future}; // 0.1.27
use futures_cpupool::CpuPool; // 0.1.8
use std::thread;
use std::time::Duration;

fn main() {
    let pool = CpuPool::new(8);

    let a = pool.spawn_fn(|| {
        thread::sleep(Duration::from_secs(3));
        future::ok::<_, ()>(3)
    });
    let b = pool.spawn_fn(|| {
        thread::sleep(Duration::from_secs(1));
        future::ok::<_, ()>(1)
    });

    let c = a.join(b).map(|(a, b)| a + b);

    let result = c.wait();
    println!("{:?}", result);
}
Run Code Online (Sandbox Code Playgroud)

请注意,这不是一种有效的睡眠方式,它只是一个阻塞操作的占位符.如果你真的需要睡觉,可以使用期货计时器tokio-timer之类的东西.

你可以看到总时间只有3秒:

$ time ./target/debug/example
Ok(4)

real    0m3.021s
user    0m0.008s
sys     0m0.009s
Run Code Online (Sandbox Code Playgroud)

同样,您可以使用tokio-threadpool获得相同的结果:

use std::{thread, time::Duration};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.20
use tokio_threadpool; // 0.1.14

fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> {
    future::poll_fn(move || {
        tokio_threadpool::blocking(|| {
            thread::sleep(Duration::from_secs(seconds));
            seconds
        })
    })
}

fn main() {
    let a = delay_for(3);
    let b = delay_for(1);
    let sum = a.join(b).map(|(a, b)| a + b);

    let mut runtime = Runtime::new().expect("Unable to start the runtime");
    let result = runtime.block_on(sum);
    println!("{:?}", result);
}
Run Code Online (Sandbox Code Playgroud)

这两种解决方案都不是最优的,并且没有充分利用绿色线程模型

这是正确的 - 因为你没有异步的东西!您正在尝试将两种不同的方法结合起来,并且必须在某处进行翻译才能在它们之间进行转换.

第二个不通过reactor框架提供的执行程序

我不确定你的意思.上面的例子中只有一个执行者; 由隐式创建的blocking.线程池有一些内部逻辑,用于检查线程是否已完成,但只应在用户的执行程序执行时触发blocking.