在无限循环中异步重新连接客户端到服务器

Dan*_*lva 21 rust rust-tokio

我无法创建尝试连接到服务器的客户端,并且:

  • 如果服务器关闭,它必须在无限循环中再次尝试
  • 如果服务器已启动并且连接成功,则当连接丢失时(即服务器断开客户端连接),客户端必须重新启动无限循环以尝试连接到服务器

这是连接到服务器的代码; 当前连接丢失时,程序退出.我不确定实现它的最佳方法是什么; 也许我必须创建一个Future无限循环?

extern crate tokio_line;
use tokio_line::LineCodec;

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {                                                                                                                                   
    let remote_addr = "127.0.0.1:9876".parse().unwrap();                                                                                                                                                            
    let tcp = TcpStream::connect(&remote_addr, handle);                                                                                                                                                             

    let client = tcp.and_then(|stream| {                                                                                                                                                                            
        let (sink, from_server) = stream.framed(LineCodec).split();                                                                                                                                                 
        let reader = from_server.for_each(|message| {                                                                                                                                                               
            println!("{}", message);                                                                                                                                                                                
            Ok(())                                                                                                                                                                                                  
        });                                                                                                                                                                                                         

        reader.map(|_| {                                                                                                                                                                                            
            println!("CLIENT DISCONNECTED");                                                                                                                                                                        
            ()                                                                                                                                                                                                      
        }).map_err(|err| err)                                                                                                                                                                                       
    });                                                                                                                                                                                                             

    let client = client.map_err(|_| { panic!()});                                                                                                                                                                   
    Box::new(client)                                                                                                                                                                                                
}                                                                                                                                                                                                                   

fn main() {                                                                                                                                                                                                         
    let mut core = Core::new().unwrap();                                                                                                                                                                            
    let handle = core.handle();                                                                                                                                                                                     
    let client = get_connection(&handle);                                                                                                                                                                           

    let client = client.and_then(|c| {                                                                                                                                                                              
        println!("Try to reconnect");                                                                                                                                                                               
        get_connection(&handle);                                                                                                                                                                                    
        Ok(())                                                                                                                                                                                                      
    });                                                                                                                                                                                                             

    core.run(client).unwrap();                                                                                                                                                                                      
}
Run Code Online (Sandbox Code Playgroud)

添加tokio-line crate:

tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
Run Code Online (Sandbox Code Playgroud)

aoc*_*via 18

关键问题似乎是:如何使用Tokio实现无限循环?通过回答这个问题,我们可以解决断开连接时无限重新连接的问题.根据我编写异步代码的经验,递归似乎是解决这个问题的直接解决方案.

更新:正如Shepmaster(以及Tokio Gitter的人)所指出的那样,我的原始答案会泄漏内存,因为我们构建了一个在每次迭代中都会增长的期货链.以下是一个新的:

更新的答案:使用 loop_fn

futures板条箱中有一个功能可以完全满足您的需求.它被称为loop_fn.您可以通过将主函数更改为以下内容来使用它:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = future::loop_fn((), |_| {
        // Run the get_connection function and loop again regardless of its result
        get_connection(&handle).map(|_| -> Loop<(), ()> {
            Loop::Continue(())
        })
    });

    core.run(client).unwrap();
}
Run Code Online (Sandbox Code Playgroud)

该函数类似于for循环,可以根据结果继续或中断get_connection(参见Loop枚举文档).在这种情况下,我们选择始终继续,因此它将无限地保持重新连接.

请注意,get_connection如果出现错误,您的版本将会出现混乱(例如,如果客户端无法连接到服务器).如果您还想在错误后重试,则应删除对此的调用panic!.


旧答案:使用递归

以下是我的回答,万一有人发现它很有趣.

警告:使用下面的代码会导致无限制的内存增长.

制作get_connection无限循环

我们想在get_connection每次客户端断开连接时调用该函数,这正是我们要做的事情(看看之后的注释reader.and_then):

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
    let remote_addr = "127.0.0.1:9876".parse().unwrap();
    let tcp = TcpStream::connect(&remote_addr, handle);
    let handle_clone = handle.clone();

    let client = tcp.and_then(|stream| {
        let (sink, from_server) = stream.framed(LineCodec).split();
        let reader = from_server.for_each(|message| {
            println!("{}", message);
            Ok(())
        });

        reader.and_then(move |_| {
            println!("CLIENT DISCONNECTED");
            // Attempt to reconnect in the future
            get_connection(&handle_clone)
        })
    });

    let client = client.map_err(|_| { panic!()});
    Box::new(client)
}
Run Code Online (Sandbox Code Playgroud)

请记住,这get_connection是非阻塞的.它只是构建一个Box<Future>.这意味着在递归调用它时,我们仍然不会阻塞.相反,我们获得了一个新的未来,我们可以通过使用链接到前一个and_then.如您所见,这与正常递归不同,因为堆栈在每次迭代时都不会增长.

注意,我们需要克隆handle(参见handle_clone),并将其移动到传递给的闭包中reader.and_then.这是必要的,因为闭包的寿命比函数长(它将在未来包含在我们返回时).

处理错误

您提供的代码不处理客户端无法连接到服务器的情况(也没有任何其他错误).遵循上面显示的相同原则,我们可以通过将结尾更改get_connection为以下内容来处理错误:

let handle_clone = handle.clone();
let client = client.or_else(move |err| {
    // Note: this code will infinitely retry, but you could pattern match on the error
    // to retry only on certain kinds of error
    println!("Error connecting to server: {}", err);
    get_connection(&handle_clone)
});
Box::new(client)
Run Code Online (Sandbox Code Playgroud)

请注意,or_else就像是and_then,但它会对未来产生的错误进行操作.

删除不必要的代码 main

最后,没有必要and_thenmain函数中使用.您可以使用main以下代码替换您:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = get_connection(&handle);
    core.run(client).unwrap();
}
Run Code Online (Sandbox Code Playgroud)

  • 值得注意的是,这里的*递归*不是在调用自身的函数的经典意义上的递归,因此累积堆栈帧.对`get_connection`的"递归"调用在闭包内部,只有在`get_connection`退出后才会执行. (2认同)
  • @ user4815162342刚添加评论澄清:) (2认同)