TCP 侦听器的有限并发

nz_*_*_21 1 concurrency rust

我有一个 TCP 监听器,如下所示:

pub async fn run(port: i32) -> Result<(), Box<dyn std::error::Error>> {
    let addr = format!("127.0.0.1:{}", port);
    info!("Listening on address: {}", addr);

    let listener = TcpListener::bind(addr).await?;
    loop {
        let (socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            _ = handle_connection(socket).await;
        });
    }
}

Run Code Online (Sandbox Code Playgroud)

我们假设handle_connections这是一个昂贵的 IO 绑定操作。

我想说的是,服务器一次只运行 运行中nhandle_connections任务。

实现这一目标的惯用方法是什么?

FuturesOrder我的第一个想法是在侦听器中创建一个列表,用于收集handle_connection任务。我们定期 collect()在此列表上运行n任务并从列表中清除这些任务。

但这感觉太hacky了。好奇是否有更惯用的方法?

kmd*_*eko 6

我建议使用一个Semaphore并发原语,它可以发出多个“许可证”来访问关键部分。使用 tokio 提供的看起来像这样:

use std::sync::Arc;
use tokio::sync::Semaphore;

pub async fn run(port: i32) -> Result<(), Box<dyn std::error::Error>> {
    let addr = format!("127.0.0.1:{}", port);
    info!("Listening on address: {}", addr);

    let semaphore = Arc::new(Semaphore::new(10));
    let listener = TcpListener::bind(addr).await?;
    loop {
        let (socket, _) = listener.accept().await?;
        let semaphore = semaphore.clone();
        tokio::spawn(async move {
            let _permit = semaphore.acquire().await.unwrap();
            _ = handle_connection(socket).await;
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,这将允许多个连接被accept()连接并排队等待处理它。相反,如果您想在容量满时阻止接受新连接,只需提前获得许可即可:

pub async fn run(port: i32) -> Result<(), Box<dyn std::error::Error>> {
    let addr = format!("127.0.0.1:{}", port);
    info!("Listening on address: {}", addr);

    let semaphore = Arc::new(Semaphore::new(10));
    let listener = TcpListener::bind(addr).await?;
    loop {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        let (socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            // force a move so the permit is only released when the task is complete
            let _permit = permit;
            _ = handle_connection(socket).await;
        });
    }
}
Run Code Online (Sandbox Code Playgroud)