为什么`tokio::main`会报“处理时检测到循环”的错误?

cre*_*lem 3 udp rust async-await rust-tokio

我正在使用 Tokio 和async/.await来创建一个 UDP 服务器,我可以在其中以异步方式接收和发送数据。

SendHalf我的UDP套接字的跨多个任务共享。为此,我正在使用Arc<Mutex<SendHalf>>. 这就是为什么Arc<Mutex<_>>存在。

use tokio::net::UdpSocket;
use tokio::net::udp::SendHalf;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;

struct Packet {
    sender: Arc<Mutex<SendHalf>>,
    buf: [u8; 512],
    addr: SocketAddr,
}

#[tokio::main]
async fn main() {
    let server = UdpSocket::bind(("0.0.0.0", 44667)).await.unwrap();
    let (mut server_rx, mut server_tx) = server.split();
    let sender = Arc::new(Mutex::new(server_tx));
    let (mut tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        loop {
            let mut buffer = [0; 512];
            let (_, src) = server_rx.recv_from(&mut buffer).await.unwrap();
            let packet = Packet {
                sender: sender.clone(),
                buf: buffer,
                addr: src,
            };
            tx.send(packet).await;
        }
    });

    while let Some(packet) = rx.recv().await {
        tokio::spawn(async move {
            let mut socket = packet.sender.lock().unwrap();
            socket.send_to(&packet.buf, &packet.addr).await.unwrap();
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

这里也是一个游乐场

我正面临一个我不明白的编译器错误:

use tokio::net::UdpSocket;
use tokio::net::udp::SendHalf;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;

struct Packet {
    sender: Arc<Mutex<SendHalf>>,
    buf: [u8; 512],
    addr: SocketAddr,
}

#[tokio::main]
async fn main() {
    let server = UdpSocket::bind(("0.0.0.0", 44667)).await.unwrap();
    let (mut server_rx, mut server_tx) = server.split();
    let sender = Arc::new(Mutex::new(server_tx));
    let (mut tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        loop {
            let mut buffer = [0; 512];
            let (_, src) = server_rx.recv_from(&mut buffer).await.unwrap();
            let packet = Packet {
                sender: sender.clone(),
                buf: buffer,
                addr: src,
            };
            tx.send(packet).await;
        }
    });

    while let Some(packet) = rx.recv().await {
        tokio::spawn(async move {
            let mut socket = packet.sender.lock().unwrap();
            socket.send_to(&packet.buf, &packet.addr).await.unwrap();
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

为什么我的代码会产生一个循环?为什么调用需要处理main

该错误更详细的含义是什么?我想了解这是怎么回事。

edw*_*rdw 5

根据tokio文档,当涉及到使用!Send任务的价值时

!Send在调用之间保持一个值.await将导致不友好的编译错误消息,类似于:

[... some type ...] cannot be sent between threads safely

或者:

error[E0391]: cycle detected when processing main

您正在目睹这个确切的错误。当你锁定一个Mutex

pub fn lock(&self) -> LockResult<MutexGuard<T>>
Run Code Online (Sandbox Code Playgroud)

它返回 a MutexGuard,即!Send

impl<'_, T: ?Sized> !Send for MutexGuard<'_, T>
Run Code Online (Sandbox Code Playgroud)

这编译得很好:

#[tokio::main]
async fn main() {
    ...

    while let Some(packet) = rx.recv().await {
        let mut socket = packet.sender.lock().unwrap();
        socket.send_to(&packet.buf, &packet.addr).await.unwrap();
    }
}
Run Code Online (Sandbox Code Playgroud)