我无法创建尝试连接到服务器的客户端,并且:
这是连接到服务器的代码; 当前连接丢失时,程序退出.我不确定实现它的最佳方法是什么; 也许我必须创建一个Future无限循环?
extern crate tokio_line;
use tokio_line::LineCodec;
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.map(|_| {
println!("CLIENT DISCONNECTED");
()
}).map_err(|err| err)
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
let client = client.and_then(|c| {
println!("Try to reconnect");
get_connection(&handle);
Ok(())
});
core.run(client).unwrap();
}
Run Code Online (Sandbox Code Playgroud)
添加tokio-line crate:
tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
Run Code Online (Sandbox Code Playgroud)
aoc*_*via 18
关键问题似乎是:如何使用Tokio实现无限循环?通过回答这个问题,我们可以解决断开连接时无限重新连接的问题.根据我编写异步代码的经验,递归似乎是解决这个问题的直接解决方案.
更新:正如Shepmaster(以及Tokio Gitter的人)所指出的那样,我的原始答案会泄漏内存,因为我们构建了一个在每次迭代中都会增长的期货链.以下是一个新的:
loop_fnfutures板条箱中有一个功能可以完全满足您的需求.它被称为loop_fn.您可以通过将主函数更改为以下内容来使用它:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = future::loop_fn((), |_| {
// Run the get_connection function and loop again regardless of its result
get_connection(&handle).map(|_| -> Loop<(), ()> {
Loop::Continue(())
})
});
core.run(client).unwrap();
}
Run Code Online (Sandbox Code Playgroud)
该函数类似于for循环,可以根据结果继续或中断get_connection(参见Loop枚举文档).在这种情况下,我们选择始终继续,因此它将无限地保持重新连接.
请注意,get_connection如果出现错误,您的版本将会出现混乱(例如,如果客户端无法连接到服务器).如果您还想在错误后重试,则应删除对此的调用panic!.
以下是我的回答,万一有人发现它很有趣.
警告:使用下面的代码会导致无限制的内存增长.
get_connection无限循环我们想在get_connection每次客户端断开连接时调用该函数,这正是我们要做的事情(看看之后的注释reader.and_then):
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let handle_clone = handle.clone();
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.and_then(move |_| {
println!("CLIENT DISCONNECTED");
// Attempt to reconnect in the future
get_connection(&handle_clone)
})
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
Run Code Online (Sandbox Code Playgroud)
请记住,这get_connection是非阻塞的.它只是构建一个Box<Future>.这意味着在递归调用它时,我们仍然不会阻塞.相反,我们获得了一个新的未来,我们可以通过使用链接到前一个and_then.如您所见,这与正常递归不同,因为堆栈在每次迭代时都不会增长.
注意,我们需要克隆handle(参见handle_clone),并将其移动到传递给的闭包中reader.and_then.这是必要的,因为闭包的寿命比函数长(它将在未来包含在我们返回时).
您提供的代码不处理客户端无法连接到服务器的情况(也没有任何其他错误).遵循上面显示的相同原则,我们可以通过将结尾更改get_connection为以下内容来处理错误:
let handle_clone = handle.clone();
let client = client.or_else(move |err| {
// Note: this code will infinitely retry, but you could pattern match on the error
// to retry only on certain kinds of error
println!("Error connecting to server: {}", err);
get_connection(&handle_clone)
});
Box::new(client)
Run Code Online (Sandbox Code Playgroud)
请注意,or_else就像是and_then,但它会对未来产生的错误进行操作.
main最后,没有必要and_then在main函数中使用.您可以使用main以下代码替换您:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
core.run(client).unwrap();
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2603 次 |
| 最近记录: |