不同线程内两个 tokio 运行时之间的消息传递

Lex*_*Lex 8 multithreading rust rust-tokio

我遇到了这个问题中描述的问题:如何在另一个 Tokio 运行时中创建 Tokio 运行时,而不会收到错误“无法从运行时内启动运行时”?

一些好的 Rust crate 没有异步执行器。我决定将所有此类库调用放在一个能够容忍此类操作的线程中。另一个线程应该能够使用 发送非闪烁消息tokio::channel

我编写了一个演示站来测试实施选项。在每个运行时内部进行调用tokio::spawn是为了了解 tokio 运行时和处理程序中的更多细节 - 这是问题的一部分。

问题。 如果我还有什么误解,请纠正我。
有两个 tokio 运行时。每个都在自己的线程中启动。tokio::spawn在第一个运行时内部调用会first_runtime()生成任务。tokio::spawn内部调用会second_runtime()在第二个运行时生成任务。tokio::channel这两个任务之间有一个。如果通道tx.send(...).await缓冲区未满,即使接收线程被调用阻塞,调用也不会阻塞发送线程thread::sleep()
我一切都做对了吗?这段代码的输出告诉我我是对的,但我需要确认我的推理。

use std::thread;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, Receiver, channel}; // 1.12.0


#[tokio::main(worker_threads = 1)]
#[allow(unused_must_use)]
async fn first_runtime(tx: Sender<String>) {
    thread::sleep(Duration::from_secs(1));
    println!("first thread woke up");
    tokio::spawn(async move {
        for msg_id in 0..10 {
            if let Err(e) = tx.send(format!("message {}", msg_id)).await {
                eprintln!("[ERR]: {}", e);
            } else {
                println!("message {} send", msg_id);
            }
        }
    }).await;
    println!("first thread finished");
}


#[tokio::main(worker_threads = 1)]
#[allow(unused_must_use)]
async fn second_runtime(mut rx: Receiver<String>) {
    thread::sleep(Duration::from_secs(3));
    println!("second thread woke up");
    tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            println!("{} received", msg);
        }
    }).await;
    println!("second thread finished");
}


fn main() {
    let (tx, rx) = channel::<String>(5);
    thread::spawn(move || { first_runtime(tx); });
    second_runtime(rx);
}

Run Code Online (Sandbox Code Playgroud)