我想创建一个简单的 websocket 服务器。我想处理传入的消息并发送响应,但收到错误:
error: captured variable cannot escape `FnMut` closure body
--> src\main.rs:32:27
|
32 | incoming.for_each(|m| async {
| _________________________-_^
| | |
| | inferred to be a `FnMut` closure
33 | | match m {
34 | | // Error here...
35 | | Ok(message) => do_something(message, db, &mut outgoing).await,
36 | | Err(e) => panic!(e)
37 | | }
38 | | }).await;
| |_____^ returns a reference to a captured variable which escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
Run Code Online (Sandbox Code Playgroud)
这在 Stack Overflow 上引起了一些关注,但我在代码中没有看到变量转义的任何地方。异步块不会同时运行,所以我没有看到任何问题。此外,我觉得我正在做一些非常简单的事情:我得到一个类型,它允许我将数据发送回客户端,但是当在异步块中使用对其的引用时,它会给出编译错误。仅当我在异步代码中使用outgoingor变量时才会发生该错误。db
这是我的代码(错误在函数中handle_connection):
主程序.rs
use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
struct DatabaseConnection;
#[tokio::main]
async fn main() -> Result<(), ()> {
listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}
async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
let try_socket = TcpListener::bind(address).await;
let mut listener = try_socket.expect("Failed to bind on address");
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(stream, addr, db.clone()));
}
Ok(())
}
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
let db = &*db;
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();
let (mut outgoing, incoming) = ws_stream.split();
// Adding 'move' does also not work
incoming.for_each(|m| async {
match m {
// Error here...
Ok(message) => do_something(message, db, &mut outgoing).await,
Err(e) => panic!(e)
}
}).await;
}
async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
// Do something...
// Send some message
let _ = outgoing.send(Message::Text("yay".to_string())).await;
}
Run Code Online (Sandbox Code Playgroud)
Cargo.toml
[dependencies]
futures = "0.3.*"
futures-channel = "0.3.*"
futures-util = "0.3.*"
tokio = { version = "0.2.*", features = [ "full" ] }
tokio-tungstenite = "0.10.*"
tungstenite = "0.10.*"
Run Code Online (Sandbox Code Playgroud)
使用时async move,出现以下错误:
代码
incoming.for_each(|m| async move {
let x = &mut outgoing;
let b = db;
}).await;
Run Code Online (Sandbox Code Playgroud)
错误
use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
struct DatabaseConnection;
#[tokio::main]
async fn main() -> Result<(), ()> {
listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}
async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
let try_socket = TcpListener::bind(address).await;
let mut listener = try_socket.expect("Failed to bind on address");
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(stream, addr, db.clone()));
}
Ok(())
}
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
let db = &*db;
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();
let (mut outgoing, incoming) = ws_stream.split();
// Adding 'move' does also not work
incoming.for_each(|m| async {
match m {
// Error here...
Ok(message) => do_something(message, db, &mut outgoing).await,
Err(e) => panic!(e)
}
}).await;
}
async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
// Do something...
// Send some message
let _ = outgoing.send(Message::Text("yay".to_string())).await;
}
Run Code Online (Sandbox Code Playgroud)
Öme*_*den 17
FnMut是一个匿名结构,自从FnMut捕获了&mut outgoing,它就成为这个匿名结构内部的一个字段,并且该字段将在每次调用 时使用FnMut,它可以被多次调用。如果你以某种方式丢失了它(通过返回或移动到另一个作用域等),你的程序将无法使用该字段进行进一步的调用,因为安全性 Rust 编译器不允许你这样做(对于你的两种情况)。
在您的情况下,&mut outgoing我们可以将其用作每次调用的参数,而不是捕获 ,这样我们将保留 的所有权outgoing。您可以使用futures-rs 中的Fold来做到这一点:
incoming
.fold(outgoing, |mut outgoing, m| async move {
match m {
// Error here...
Ok(message) => do_something(message, db, &mut outgoing).await,
Err(e) => panic!(e),
}
outgoing
})
.await;
Run Code Online (Sandbox Code Playgroud)
这可能看起来有点棘手,但它确实有效,我们使用常量accumulator( outgoing),它将用作我们的FnMut.
Playground(感谢@Solomon Ucko 创建可重现的示例)
也可以看看 :