为什么异步 TcpStream 会阻塞?

mat*_*ner 1 asynchronous rust rust-tokio

我正在开发一个项目,用 Rust 实现分布式键值存储。我使用 Tokio 的异步运行时编写了服务器端代码。我遇到了一个问题,我的异步代码似乎被阻塞,因此当我与服务器有多个连接时,仅处理一个 TcpStream。我对实现代码很陌生async,无论是一般的还是关于 Rust,但我认为如果给定的 tcp 流上没有活动,其他流将被接受和处理。

是我对 async 的理解错误还是我错误地使用了 tokio ?

这是我的切入点:

use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;

extern crate blue;

use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

    let opt = args::Opt::from_args();
    let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
    let role = NodeRole::from_str(opt.role.as_str()).unwrap();
    let leader_addr = match role {
        NodeRole::Leader => addr,
        NodeRole::Follower => SocketAddr::from_str(opt.follow.unwrap().as_str())?,
    };

    let wal_name = addr.to_string().replace(".", "").replace(":", "");
    let wal_full_name = format!("wal{}.log", wal_name);
    let wal_path = PathBuf::from(wal_full_name);
    let mut wal = match wal_path.exists() {
        true => {
            info!("Existing WAL found");
            WriteAheadLog::open(&wal_path)?
        }
        false => {
            info!("Creating WAL");
            WriteAheadLog::new(&wal_path)?
        }
    };
    debug!("WAL: {:?}", wal);

    let store_name = addr.to_string().replace(".", "").replace(":", "");
    let store_pth = format!("{}.pb", store_name);
    let store_path = Path::new(&store_pth);
    let mut store = match store_path.exists() {
        true => deserialize_store(store_path)?,
        false => message::Store::default(),
    };

    let listener = TcpListener::bind(addr).await?;
    let cluster = Cluster::new(addr, &role, leader_addr, &mut wal, &mut store).await?;

    let store_path = Arc::new(store_path);
    let store = Arc::new(Mutex::new(store));

    let wal = Arc::new(Mutex::new(wal));
    let cluster = Arc::new(Mutex::new(cluster));
    info!("Blue launched. Waiting for incoming connection");

    loop {
        let (stream, addr) = listener.accept().await?;
        info!("Incoming request from {}", addr);
        let store = Arc::clone(&store);
        let store_path = Arc::clone(&store_path);
        let wal = Arc::clone(&wal);
        let cluster = Arc::clone(&cluster);
        handle_stream(stream, store, store_path, wal, cluster, &role).await?;
    }
}
Run Code Online (Sandbox Code Playgroud)

下面是我的处理程序(handle_stream来自上面)。我排除了所有处理程序,match input因为我认为它们没有必要证明这一点(该部分的完整代码在这里:https ://github.com/matthewmturner/Bradfield-Distributed-Systems/blob/main/blue/ src/store/handler.rs如果它确实有帮助)。

具体来说,阻塞的点是线let input = async_read_message::<message::Request>(&mut stream).await;

这是服务器等待来自客户端或集群中另一台服务器的通信的地方。我当前看到的行为是,在使用客户端连接到服务器后,服务器不会收到任何将其他节点添加到集群的请求 - 它只处理客户端流。

use std::io;
use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use log::{debug, error, info};
use serde_json::json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream as asyncTcpStream;

use super::super::ipc::message;
use super::super::ipc::message::request::Command;
use super::super::ipc::receiver::async_read_message;
use super::super::ipc::sender::{async_send_message, send_message};
use super::cluster::{Cluster, NodeRole};
use super::serialize::persist_store;
use super::wal::WriteAheadLog;

// TODO: Why isnt async working? I.e. connecting servers after client is connected stays on client stream.
pub async fn handle_stream<'a>(
    mut stream: asyncTcpStream,
    store: Arc<Mutex<message::Store>>,
    store_path: Arc<&Path>,
    wal: Arc<Mutex<WriteAheadLog<'a>>>,
    cluster: Arc<Mutex<Cluster>>,
    role: &NodeRole,
) -> io::Result<()> {
    loop {
        info!("Handling stream: {:?}", stream);
        let input = async_read_message::<message::Request>(&mut stream).await;
        debug!("Input: {:?}", input);
        match input {
        ...
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是代码async_read_message

pub async fn async_read_message<M: Message + Default>(
    stream: &mut asyncTcpStream,
) -> io::Result<M> {
    let mut len_buf = [0u8; 4];
    debug!("Reading message length");
    stream.read_exact(&mut len_buf).await?;
    let len = i32::from_le_bytes(len_buf);
    let mut buf = vec![0u8; len as usize];
    debug!("Reading message");
    stream.read_exact(&mut buf).await?;
    let user_input = M::decode(&mut buf.as_slice())?;
    debug!("Received message: {:?}", user_input);
    Ok(user_input)
}
Run Code Online (Sandbox Code Playgroud)

kmd*_*eko 6

您的问题在于客户端连接后如何处理消息:

handle_stream(stream, store, store_path, wal, cluster, &role).await?;
Run Code Online (Sandbox Code Playgroud)

.await意味着您的监听循环将等待handle_stream返回,但是(做出一些假设)该函数在客户端断开连接之前不会返回。你想要的是tokio::spawn一个可以独立运行的新任务:

tokio::spawn(handle_stream(stream, store, store_path, wal, cluster, &role));
Run Code Online (Sandbox Code Playgroud)

您可能必须更改某些参数类型以避免生命周期;tokio::spawn需要'static,因为任务的生命周期与其生成的范围是分离的。