如何在 Tokio 运行时上下文中从异步方法调用的非异步方法中等待 future?

gro*_*wse 24 future rust async-await rust-tokio

我正在使用 Tokio 1.1 来做异步事情。我有一个async mainwith #[tokio::main],所以我已经在使用运行时进行操作。

main调用一个非异步方法,我希望await在未来(具体来说,我正在从数据融合数据帧收集)。这种非异步方法具有由特征规定的签名,该特征返回结构体,而不是Future<Struct>. 据我所知,我无法将其标记为异步。

如果我尝试打电话df.collect().await;,我会得到

只允许在async函数和块内部

来自编译器的错误,指出我await在其中调用的方法不是async.

如果我尝试block_on从一个新的运行时开始,未来是这样的:

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());
Run Code Online (Sandbox Code Playgroud)

我遇到运行时恐慌:

无法从运行时内部启动运行时。发生这种情况是因为函数(如block_on)试图在当前线程用于驱动异步任务时阻止该线程。

如果我尝试futures::executor::block_on(df.collect()).unwrap();,我会遇到新的运行时恐慌:

“当前未在 Tokio 0.2.x 运行时上运行。”

这很奇怪,因为我使用的是 Tokio v1.1。

这感觉比应该的更难。我处于异步上下文中,感觉编译器应该知道这一点并允许我.await从方法内调用 - 唯一的代码路径从块内调用此方法async。有没有一种简单的方法可以做到我所缺少的?

Ibr*_*med 15

我处于异步上下文中,感觉编译器应该知道这一点并允许我从方法内调用 .await

无论您是否在运行时的上下文中,基本上都不可能进入await同步函数的内部。await被转换为屈服点,并且async函数被转换为利用这些屈服点来执行异步计算的状态机。如果不将您的函数标记为async,则这种转换是不可能的。

如果我正确理解你的问题,你有以下代码:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    foo.bar()
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是您无法df.collect从内部等待bar,因为它没有标记为async。如果您可以修改 的签名Trait,那么您可以使用如何在特征中定义异步方法?Trait::bar中提到的解决方法创建异步方法。。

如果您无法更改 的签名Trait,那么您就有问题了。异步函数应该花费很长时间才到达.await. 正如在 future-rs 中封装阻塞 I/O 的最佳方法是什么?中所解释的那样。spawn_blocking,您可以在转换为非异步代码时使用:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    tokio::task::spawn_blocking(move || foo.bar()).await
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}
Run Code Online (Sandbox Code Playgroud)

df.collect现在您需要一种无需等待即可运行完成的方法。您提到您尝试创建嵌套运行时来解决此问题:

如果我尝试从新的运行时阻止未来......我会感到恐慌

但是,tokio 不允许您创建嵌套运行时。您可以创建一个新的独立运行时,如如何在另一个 Tokio 运行时中创建 Tokio 运行时中所述。但是,生成嵌套运行时的效率很低。

您可以获取当前运行时的句柄,而不是生成新的运行时:

let handle = Handle::current();
Run Code Online (Sandbox Code Playgroud)

输入运行时上下文:

handle.enter();
Run Code Online (Sandbox Code Playgroud)

然后运行 ​​future 来完成futures::executor::block_on

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        let handle = Handle::current();
        handle.enter();
        futures::executor::block_on(df.collect())
    }
}
Run Code Online (Sandbox Code Playgroud)

输入 tokio 运行时上下文将解决您之前遇到的错误:

如果我尝试 futures::executor::block_on(df.collect()).unwrap();,我会遇到新的运行时恐慌not currently running on a Tokio 0.2.x runtime

我强烈建议您尽可能避免这样做。最佳解决方案是标记Trait::barasyncawait正常。任何其他解决方案(包括上面提到的解决方案)都涉及阻塞当前线程,直到给定的未来完成。

感谢@AliceRyhl 的解释

  • 谢谢 - 我不知道这些都是可能的。最后,你是对的——重构异步是正确的做法。 (3认同)