Hei*_*nzi 23 asynchronous rust async-await rust-tokio rust-futures
我面临着一种情况,我需要从对象的放置处理程序运行异步代码。整个应用程序在 tokio 异步上下文中运行,因此我知道 drop 处理程序是使用活动的 tokio 运行时调用的,但不幸的是 drop 本身是一个同步函数。
理想情况下,我想要一个适用于多线程和当前线程运行时的解决方案,但如果不存在,那么我可以使用阻止删除线程并依赖其他线程来驱动的解决方案期货。
我考虑了多种选择,但我不确定哪种方法最好,也不了解它们的权衡。对于这些示例,我们假设我的类有一个async terminate(&mut self)我希望从 调用的函数drop()。
struct MyClass;
impl MyClass {
async fn terminate(&mut self) {}
}
Run Code Online (Sandbox Code Playgroud)
选项1:tokio::runtime::Handle::block_on
impl Drop for MyClass {
fn drop(&mut self) {
tokio::runtime::Handle::current().block_on(self.terminate());
}
}
Run Code Online (Sandbox Code Playgroud)
这似乎是最直接的方法,但不幸的是它会引起恐慌
Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
Run Code Online (Sandbox Code Playgroud)
看操场
我对此有点困惑,因为我认为Handle::block_on会使用当前正在运行的运行时,但它似乎试图启动一个新的运行时?这里发生了什么?
另外,根据 的文档Handle::block_on,这不能驱动 IO 线程。所以我猜想阻塞这个线程是有风险的——如果太多的对象同时被破坏,每个对象阻塞一个线程,并且这些 futures 等待 IO 工作,那么这就会死锁。
选项2:futures::executor::block_on
Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
Run Code Online (Sandbox Code Playgroud)
看操场
这似乎工作正常。如果我理解正确的话,那么它会在当前线程上生成一个新的非 tokio 执行器,并让该线程驱动未来。这是一个问题吗?这是否会导致正在运行的 tokio 执行器和新的 future 执行器之间发生冲突?
另外,这实际上可以驱动 IO 线程,避免选项 1 的问题吗?或者那些 IO 线程是否仍然在 tokio 执行器上等待?
选项 3:tokio::task::spawn与futures::executor::block_on
impl Drop for MyClass {
fn drop(&mut self) {
futures::executor::block_on(self.terminate());
}
}
Run Code Online (Sandbox Code Playgroud)
看操场
这应该让 tokio 运行时驱动终止 future,而 futures 运行时仅阻止当前线程等待 tokio 运行时完成?这是否比选项 2 更安全并且导致运行时之间的冲突更少?不幸的是,这遇到了我无法弄清楚的终生问题:
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:8:44
|
7 | fn drop(&mut self) {
| --------- this data with an anonymous lifetime `'_`...
8 | let task = tokio::task::spawn(self.terminate());
| ---- ^^^^^^^^^
| |
| ...is used here...
|
note: ...and is required to live as long as `'static` here
--> src/main.rs:8:20
|
8 | let task = tokio::task::spawn(self.terminate());
| ^^^^^^^^^^^^^^^^^^
note: `'static` lifetime requirement introduced by this bound
--> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:28
|
127 | T: Future + Send + 'static,
| ^^^^^^^
Run Code Online (Sandbox Code Playgroud)
我也尝试修复此问题LocalSet,但无法使其正常工作。有办法让这个工作吗?
选项 3b
然而,如果我按值terminate()获取self并包装MyClass到Wrapper. 不太漂亮,但可能比选项 2 更好,因为它使用 tokio 运行时来驱动未来?
impl Drop for MyClass {
fn drop(&mut self) {
let task = tokio::task::spawn(self.terminate());
futures::executor::block_on(task);
}
}
Run Code Online (Sandbox Code Playgroud)
看操场
这是一个好方法吗?tokio 运行时驱动下降未来真的很重要,还是更简单的选项 2 更好?有什么方法可以让选项 3b 更漂亮/更容易使用吗?
选项 4:后台任务
我在这里找到了这个选项:https ://stackoverflow.com/a/68851788/829568 它基本上在对象的构造函数中生成一个后台任务,等待触发器并在触发时运行异步放置代码。然后 drop 实现会触发它并运行一个繁忙的等待循环,直到完成。
这看起来过于复杂,而且比这里的其他选项更容易出错。或者这实际上是最好的解决方案?
关于耗尽工作线程的附带问题
除了选项 1 之外,所有这些选项都会阻止 tokio 工作线程等待异步删除完成。在多线程运行时,这在大多数情况下都会顺利进行,但理论上,如果多个析构函数并行运行,则可能会锁定所有工作线程 - 并且 IIUC 那么我们将陷入死锁,没有线程取得进展。选项 1 似乎更好一些,但block_on文档说它只能驱动非 IO 期货。因此,如果太多析构函数执行 IO 工作,它仍然可能会锁定。有没有办法告诉 tokio 将工作线程数量增加一个?如果我们对每个阻塞的线程都这样做,是否可以避免这个问题?
选项 5:新线程中的新运行时
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:8:44
|
7 | fn drop(&mut self) {
| --------- this data with an anonymous lifetime `'_`...
8 | let task = tokio::task::spawn(self.terminate());
| ---- ^^^^^^^^^
| |
| ...is used here...
|
note: ...and is required to live as long as `'static` here
--> src/main.rs:8:20
|
8 | let task = tokio::task::spawn(self.terminate());
| ^^^^^^^^^^^^^^^^^^
note: `'static` lifetime requirement introduced by this bound
--> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:28
|
127 | T: Future + Send + 'static,
| ^^^^^^^
Run Code Online (Sandbox Code Playgroud)
看操场
这似乎有效,并尝试通过在新线程中的新运行时运行删除任务来避免阻塞工作线程的问题。希望这个新线程能够驱动 IO 任务。但这真的能彻底解决问题吗?如果 drop 任务依赖于主 tokio 执行器上运行的 IO 任务怎么办?我认为这仍然有可能导致程序无限期锁定。
选项1:tokio::runtime::Handle::block_on
该block_on函数是 tokio 运行时的入口点;#[tokio::main]例如,当您使用 进行注释时,它会运行。如果这可行的话,tokio 将生成一个全新的运行时,并在其完成时阻止当前线程。你绝对不想要这样!
选项2:futures::executor::block_on
这可以工作,但是会阻塞,因此并不理想,因为该线程上的其他任务在完成之前都无法取得进展。
选项 3:tokio::task::spawn与futures::executor::block_on
你不需要block_on这里;生成任务将运行该任务直至完成。无需阻塞任何线程!这就是我要做的。但是,您注意到一个问题,如果编译器允许这样做,则会导致内存错误。让我们假装我们能做到:
foo: MyClass。foo被丢弃。foo我们生成一个带有run引用的任务foo.terminate()。foo不再存在,但我们有一个引用它的后台任务!最好的情况是段错误。那么我们该如何避免呢?这就引出了选项 3b。
选项 3b
我认为这是一个很好的解决方案(同样,没有block_on)。
如果MyClass有一个廉价的default()实现,那么您不需要包装器,并且可以将其替换为默认值。我的第一个想法是调用std::mem::take,这会在其位置保留默认值,但这遇到了问题;你最终会遇到堆栈溢出调用 drop。因此,我们可以使用一个标志来指示它已被删除:
#[derive(Default)]
struct MyClass {
dropped: bool,
}
impl MyClass {
async fn terminate(&mut self) {
println!("Terminating");
}
}
impl Drop for MyClass {
fn drop(&mut self) {
if !self.dropped {
let mut this = MyClass::default();
std::mem::swap(&mut this, self);
this.dropped = true;
tokio::spawn(async move { this.terminate().await });
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果您发现自己非常想实现这一点,您可以创建一个Dropper包装器以用于各种类型:
#[async_trait::async_trait]
pub trait AsyncDrop {
async fn async_drop(&mut self);
}
#[derive(Default)]
pub struct Dropper<T: AsyncDrop + Default + Send + 'static> {
dropped: bool,
inner: T,
}
impl<T: AsyncDrop + Default + Send + 'static> Dropper<T> {
pub fn new(inner: T) -> Self {
Self {
dropped: false,
inner,
}
}
}
impl<T: AsyncDrop + Default + Send + 'static> Drop for Dropper<T> {
fn drop(&mut self) {
if !self.dropped {
let mut this = Dropper::default();
std::mem::swap(&mut this, self);
this.dropped = true;
tokio::spawn(async move {
this.inner.async_drop().await;
});
}
}
}
Run Code Online (Sandbox Code Playgroud)
选项 4:后台任务
另一个答案已经涵盖了这一点:/sf/answers/5021902721/
选项 5:新线程中的新运行时
我绝对不会在每次你想要删除时都生成一个新的运行时;这是非常严厉的。
通过使用作用域线程,您也无法解决阻塞问题。线程将在作用域的末尾立即加入,并阻塞直到运行时完成。