Future 生成器闭包时出错:捕获的变量无法转义“FnMut”闭包主体

NoK*_*Key 9 rust async-await

我想创建一个简单的 websocket 服务器。我想处理传入的消息并发送响应,但收到错误:

error: captured variable cannot escape `FnMut` closure body
  --> src\main.rs:32:27
   |
32 |       incoming.for_each(|m| async {
   |  _________________________-_^
   | |                         |
   | |                         inferred to be a `FnMut` closure
33 | |         match m {
34 | |             // Error here...
35 | |             Ok(message) => do_something(message, db, &mut outgoing).await,
36 | |             Err(e) => panic!(e)
37 | |         }
38 | |     }).await;
   | |_____^ returns a reference to a captured variable which escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape
Run Code Online (Sandbox Code Playgroud)

这在 Stack Overflow 上引起了一些关注,但我在代码中没有看到变量转义的任何地方。异步块不会同时运行,所以我没有看到任何问题。此外,我觉得我正在做一些非常简单的事情:我得到一个类型,它允许我将数据发送回客户端,但是当在异步块中使用对其的引用时,它会给出编译错误。仅当我在异步代码中使用outgoingor变量时才会发生该错误。db

这是我的代码(错误在函数中handle_connection):

主程序.rs

use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;

struct DatabaseConnection;

#[tokio::main]
async fn main() -> Result<(), ()> {
    listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}

async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
    let try_socket = TcpListener::bind(address).await;
    let mut listener = try_socket.expect("Failed to bind on address");

    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(stream, addr, db.clone()));
    }

    Ok(())
}

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
    let db = &*db;
    let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();

    let (mut outgoing, incoming) = ws_stream.split();

    // Adding 'move' does also not work
    incoming.for_each(|m| async {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e)
        }
    }).await;
}

async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
    // Do something...

    // Send some message
    let _ = outgoing.send(Message::Text("yay".to_string())).await;
}
Run Code Online (Sandbox Code Playgroud)

Cargo.toml

[dependencies]
futures = "0.3.*"
futures-channel = "0.3.*"
futures-util = "0.3.*"
tokio = { version = "0.2.*", features = [ "full" ] }
tokio-tungstenite = "0.10.*"
tungstenite = "0.10.*"
Run Code Online (Sandbox Code Playgroud)

使用时async move,出现以下错误:

代码

incoming.for_each(|m| async move {
    let x = &mut outgoing;
    let b = db;
}).await;
Run Code Online (Sandbox Code Playgroud)

错误

use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;

struct DatabaseConnection;

#[tokio::main]
async fn main() -> Result<(), ()> {
    listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}

async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
    let try_socket = TcpListener::bind(address).await;
    let mut listener = try_socket.expect("Failed to bind on address");

    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(stream, addr, db.clone()));
    }

    Ok(())
}

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
    let db = &*db;
    let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();

    let (mut outgoing, incoming) = ws_stream.split();

    // Adding 'move' does also not work
    incoming.for_each(|m| async {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e)
        }
    }).await;
}

async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
    // Do something...

    // Send some message
    let _ = outgoing.send(Message::Text("yay".to_string())).await;
}
Run Code Online (Sandbox Code Playgroud)

Öme*_*den 17

FnMut是一个匿名结构,自从FnMut捕获了&mut outgoing,它就成为这个匿名结构内部的一个字段,并且该字段将在每次调用 时使用FnMut,它可以被多次调用。如果你以某种方式丢失了它(通过返回或移动到另一个作用域等),你的程序将无法使用该字段进行进一步的调用,因为安全性 Rust 编译器不允许你这样做(对于你的两种情况)。

在您的情况下,&mut outgoing我们可以将其用作每次调用的参数,而不是捕获 ,这样我们将保留 的所有权outgoing。您可以使用futures-rs 中的Fold来做到这一点:

incoming
    .fold(outgoing, |mut outgoing, m| async move {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e),
        }

        outgoing
    })
    .await;
Run Code Online (Sandbox Code Playgroud)

这可能看起来有点棘手,但它确实有效,我们使用常量accumulator( outgoing),它将用作我们的FnMut.

Playground(感谢@Solomon Ucko 创建可重现的示例)

也可以看看 :