我有一个 TCP 监听器,如下所示:
pub async fn run(port: i32) -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("127.0.0.1:{}", port);
info!("Listening on address: {}", addr);
let listener = TcpListener::bind(addr).await?;
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move {
_ = handle_connection(socket).await;
});
}
}
Run Code Online (Sandbox Code Playgroud)
我们假设handle_connections这是一个昂贵的 IO 绑定操作。
我想说的是,服务器一次只运行 运行中n的handle_connections任务。
实现这一目标的惯用方法是什么?
FuturesOrder我的第一个想法是在侦听器中创建一个列表,用于收集handle_connection任务。我们定期 collect()在此列表上运行n任务并从列表中清除这些任务。
但这感觉太hacky了。好奇是否有更惯用的方法?
我建议使用一个Semaphore并发原语,它可以发出多个“许可证”来访问关键部分。使用 tokio 提供的看起来像这样:
use std::sync::Arc;
use tokio::sync::Semaphore;
pub async fn run(port: i32) -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("127.0.0.1:{}", port);
info!("Listening on address: {}", addr);
let semaphore = Arc::new(Semaphore::new(10));
let listener = TcpListener::bind(addr).await?;
loop {
let (socket, _) = listener.accept().await?;
let semaphore = semaphore.clone();
tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
_ = handle_connection(socket).await;
});
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,这将允许多个连接被accept()连接并排队等待处理它。相反,如果您想在容量满时阻止接受新连接,只需提前获得许可即可:
pub async fn run(port: i32) -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("127.0.0.1:{}", port);
info!("Listening on address: {}", addr);
let semaphore = Arc::new(Semaphore::new(10));
let listener = TcpListener::bind(addr).await?;
loop {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let (socket, _) = listener.accept().await?;
tokio::spawn(async move {
// force a move so the permit is only released when the task is complete
let _permit = permit;
_ = handle_connection(socket).await;
});
}
}
Run Code Online (Sandbox Code Playgroud)