如何从 Tokio 的非主线程运行异步任务?

ibs*_*bse 12 asynchronous rust rust-tokio

use std::thread;
use tokio::task; // 0.3.4

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        task::spawn(async {
            println!("123");
        });
    })
    .join();
}
Run Code Online (Sandbox Code Playgroud)

编译时我收到警告:

warning: unused `std::result::Result` that must be used
  --> src/main.rs:6:5
   |
6  | /     thread::spawn(|| {
7  | |         task::spawn(async {
8  | |             println!("123");
9  | |         });
10 | |     })
11 | |     .join();
   | |____________^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: this `Result` may be an `Err` variant, which should be handled
Run Code Online (Sandbox Code Playgroud)

执行时出现错误:

thread '<unnamed>' panicked at 'must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`', src/main.rs:7:9
Run Code Online (Sandbox Code Playgroud)

小智 10

我有一个作业处理应用程序,它公开一个 Web API 来添加作业并处理它们,但 API 请求不应等待作业完成(可能需要一段时间)。我使用服务器发送事件来广播作业结果。这意味着主 API 服务器正在mainwith内部执行#[tokio::main],但是我应该在哪里运行作业执行器?在作业执行器中,我将有很多等待:比如下载之类的事情。它们会干扰 Web API 服务器。关键问题是我如何并行启动两个执行?

在这种情况下,您需要创建一个单独的线程,在 thread::spawn其中创建一个 Tokio 执行器。您得到的错误是,在第二个线程内,没有 Tokio 执行程序(运行时)。您需要手动创建一个并告诉它运行您的任务。更简单的方法是使用RuntimeAPI:

use tokio::runtime::Runtime; // 0.2.23

// Create the runtime
let rt = Runtime::new().unwrap();

// Spawn a future onto the runtime
rt.spawn(async {
    println!("now running on a worker thread");
});
Run Code Online (Sandbox Code Playgroud)

在您的主线程中,执行器已经可以使用#[tokio::main]. 在添加此属性之前,运行时是手动创建的。

如果您想坚持异步/等待哲学,您可以使用join

use tokio; // 0.2.23

#[tokio::main]
async fn main() {
    let (_, _) = tokio::join!(start_server_listener(), start_job_processor());
}
Run Code Online (Sandbox Code Playgroud)

这就是为什么大多数答案都质疑你的方法。虽然非常罕见,但我相信在某些情况下,您希望异步运行时位于另一个线程上,同时还具有手动配置运行时的好处。


She*_*ter 8

关键是你需要得到一个 Tokio Handle。这是对 a 的引用Runtime,它允许您从运行时外部生成异步任务。

使用时#[tokio::main],获取 a 的最简单方法Handle产生另一个线程之前通过 via然后将句柄提供给每个可能想要启动异步任务的线程:Handle::current

use std::thread;
use tokio::runtime::Handle; // 0.3.4

#[tokio::main]
async fn main() {
    let threads: Vec<_> = (0..3)
        .map(|thread_id| {
            let handle = Handle::current();

            thread::spawn(move || {
                eprintln!("Thread {} started", thread_id);

                for task_id in 0..3 {
                    handle.spawn(async move {
                        eprintln!("Thread {} / Task {}", thread_id, task_id);
                    });
                }

                eprintln!("Thread {} finished", thread_id);
            })
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }
}
Run Code Online (Sandbox Code Playgroud)

您还可以创建一个全球性的,可变单身Mutex<Option<Handle>>,它初始化None,然后将其设置为Some早在你的tokio::main功能。然后,您可以获取该全局变量,解开它,并Handle在需要时克隆它:

use once_cell::sync::Lazy; // 1.5.2

static HANDLE: Lazy<Mutex<Option<Handle>>> = Lazy::new(Default::default);
Run Code Online (Sandbox Code Playgroud)
*HANDLE.lock().unwrap() = Some(Handle::current());
Run Code Online (Sandbox Code Playgroud)
let handle = HANDLE.lock().unwrap().as_ref().unwrap().clone();
Run Code Online (Sandbox Code Playgroud)

也可以看看:


归档时间:

查看次数:

6008 次

最近记录:

5 年,1 月 前