在 Rust 中使用 tokio 代替 OS 线程有什么好处

use*_*897 9 multithreading threadpool rust rust-cargo rust-tokio

我正在尝试用 Rust 制作一个多线程 tcp 通信程序

这个想法是主线程上存在一个侦听套接字,并且当连接进入时,工作由工作线程处理

我之前使用了 Rust 书中找到的 ThreadPool 方法,但据我了解,tokio 能够“自动”将工作分配给池中的线程

我对操作系统线程和 tokio 任务之间的区别感到困惑(主要是因为您用来spawn创建两者)

这是一些代码

fn main() {
    println!("Hello World!");
    let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 103, 7)), 2048);
    println!("socket -> {}", socket);

    // You need to use the tokio runtime to execute async functions
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let listener = StartListen::new(socket).await.unwrap();
    });
}
Run Code Online (Sandbox Code Playgroud)

StartListen在另一个文件中定义了

// Defines the StartListen class
pub struct StartListen {
    listener: TcpListener,
}


// Implementation for StartListen class
impl StartListen {
    pub async fn new(socket: SocketAddr) -> Result<StartListen, StartListenError>{
        println!("Attempting to listen");
        let bind_result = TcpListener::bind(socket).await;
        sleep(Duration::from_secs(5)).await;
        match bind_result {
            Ok(listener) => {
                println!("Server is listening on {}", &socket);
                Ok(StartListen { listener })
            }
            Err(err) => Err(StartListenError::BindError(err)),
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

为了添加更多上下文,这个想法是这个套接字期望有两种类型的消息

// Defines the types of messages to expect
pub enum RequestType {
    RequestWork {
        request_message: String,
        parameter: String,
        sender_timestamp: String,
    },
    CloseConnection {
        initial_timestamp: String,
        final_timestamp: String,
    },
    Invalid(String),
}
Run Code Online (Sandbox Code Playgroud)

我还没有添加handle_connection方法,我是否必须定义handle_connection 才能坐在循环中并生成任务?

    pub async fn accept_connections(&self) {
        loop {
            let (mut stream, addr) = self.listener.accept().await.unwrap();
            println!("New connection from {}", addr);

            // Spawn a new task to handle the connection
            tokio::spawn(async move {
                let mut buffer = [0; 1024];
                loop {
                    let n = match stream.read(&mut buffer).await {
                        Ok(n) if n == 0 => return, // Connection closed
                        Ok(n) => n,
                        Err(e) => {
                            eprintln!("Error reading from socket: {}", e);
                            return;
                        }
                    };

                    // Convert the received message into a RequestType
                    let message = String::from_utf8_lossy(&buffer[..n]);
Run Code Online (Sandbox Code Playgroud)

Fin*_*nis 19

你已经非常接近理解了。你问的是正确的问题。

让我简要介绍一下可能的服务器实现:

  • 同步。一个线程完成所有事情,每个发送/接收操作都是阻塞的。
    • 缺点很明显:只能同时处理一个连接。
  • 多线程。每个连接生成一个线程。
    • 上一点的缺点得到了缓解,我们现在可以同时使用多个连接
    • 但是:我们系统上的线程数量取决于用户交互
    • 大量线程可能会破坏系统,甚至出现 DOS 攻击,正是使用该机制来运行内存不足的系统。他们进行大量连接尝试,迫使服务器分配大量线程,然后不再响应。
  • 异步。这个想法是,你基本上有一个任务列表,只要有时间,一个线程就会在它们之间来回跳转。
    • 您可以将其视为协程的工作队列;基本上是可以执行的函数列表,但可以暂停并放回到队列中,直到发生某个事件(例如接收数据)
    • 修复了多线程的 DOS 问题,因为在单个线程中的工作包之间来回跳转比生成大量线程要高效得多
  • 多线程异步。与异步基本相同,但具有多个工作线程。
    • 需要大量的线程安全性,这就是为什么这种方法非常适合 Rust 编程语言
    • 的异步反应器的默认类型tokio。如果你简单地这样做#[tokio::main],这就是你会得到的。
    • 具有固定数量的线程(很可能与系统上的核心数量相同),因此对于系统过载而言更稳健
    • 可以高效地处理大量并发连接。

不过,还有很多微妙之处需要考虑。我强烈建议阅读tokio 教程,它解释了其中的许多概念。之后,我建议阅读async 书籍

例如,一些重要的微妙之处:

  • 在任何情况下都不要使用 std 的同步原语来阻止任务。阻塞任务将阻塞所有事情,因为异步调度是非抢占式的,这意味着调度程序无法取消调度任务。它只能在.await点处切换任务,因此每当您等待某件事时,请确保它在某个.await点内。(例外:短暂的std::sync::Mutex,请参见此处
  • 不要使用异步任务进行繁重的计算。虽然从技术上讲它不会阻塞,但繁重的计算会在两点之间引入很长的时间.await。相反,使用 将其卸载到真实线程spawn_blocking,这会.await向工作线程引入一个点并在不同的线程池上执行实际计算。