每次调用 Future::poll 时都需要注册一个新的唤醒器吗?

fro*_*net 1 future rust async-await rust-tokio

我正在制作自己的通道实现,但std::task::Context没有明确唤醒器是如何生成的。

我的假代码:

struct MyAtomicWaker {
    lock: SpinLock,
    is_waked: AtomicBool,
    waker: std::task::Waker, 
}

struct WeakAtomicWaker (Weak<MyAtomicWaker>)

impl MyAtomicWaker {
    fn is_waked(&self) -> bool {}
    fn weak(self: Arc<MyAtomicWaker>) -> WeakAtomicWaker;
    fn cancel(&self) {}  // nullify WeakAtomicWaker, means the waker is not waked by a future
}

impl WeakAtomicWaker {
    fn wake(self) {}  // upgrade to arc and can wake only once when waker not cancelled
}

struct ReceiveFuture<T> {
    waker: Option<Arc<MyAtomicWaker>>,
}

impl<T> Drop for ReceiveFuture<T> {
    fn drop(&mut self) {
        if let Some(waker) = self.waker.take() {  waker.cancel(); }
    }
}

impl<T> Future for ReceiveFuture<T> {
    type Output = Result<(), SendError<T>>;

    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
        let _self = self.get_mut();
        if _self.waker.is_none() {
          let my_waker = _self.reg_waker(ctx.waker().clone()); // wrap the waker with Arc, store it inside _self, and send the weak ref to other side of channel
          _self.waker.replace(my_waker);
        }

        // do some polling
        match _self.recv.try_recv() {
            Ok(item)=>{ 
                if let Some(waker) = _self.waker.take() {
                    waker.cancel();
                }
                return Poll::Ready(item);  //canncel my waker and ready
            },
            Err(TryRecvError)=>{
               if let Some(waker) = _self.waker.as_ref() {
                   if waker.is_wake() { // the waker is triggered but it's a false alarm, should make a new one.
                       let my_waker = self.reg_waker(ctx.waker().clone()); 
                       _self.waker.replace(my_waker);
                   } else { // the waker has not trigger, we do not have to make a new one ?
                   }
               }
               return Poll::Pending;    
          },
          Err(...)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

poll()每次调用都需要注册一个新的唤醒器吗?在我的代码中,由于不同 future 的组合,存在大量超时和循环选择。

我有一个在 Playground 上有效的小实验,但我不确定它在各种设置中是否始终适用于 Tokio 和 async-std 。

在我的生产代码中,我注册一个新唤醒器并在每次调用中取消旧唤醒器poll()。我不知道只在第一次注册唤醒器并在下一次轮询中重用它是否安全。

给出以下顺序:

  1. f.reg_waker(waker1) f.poll()得到Poll::Pending
  2. 由于其他未来选择而组合未​​来(或未来::选择)唤醒,但waker1尚未唤醒。 f.poll()得到Poll::Pending
  3. 一些局外人调用waker1.wake();

之后 就waker1.wake()保证能醒来吗?f


我问这个是因为:

  1. 我有一个Stream多路复用多个接收通道

  2. 我的 MPMC 和 MPSC 通道实现是无锁的。复用选择内的某些通道可能用作关闭通知通道,很少收到消息。当我多次轮询它时(比如一百万次),它将导致一百万个唤醒程序被扔到另一侧(这看起来像内存泄漏)。在没有锁的情况下取消同一未来产生的先前唤醒程序的逻辑比有锁的实现更复杂

  3. 由于这些原因,我有一个唤醒取消的解决方案,会导致公平性问题,需要尽可能避免

我对书中的内容或 API 法律的声明不感兴趣;我只对低级别的实现方式感兴趣。显示一些代码为什么它有效或为什么它不起作用会很有帮助。我编写代码来实现产品;如果有必要,我会坚持指定的依赖项或进行一些黑客攻击以完成工作,直到我有更好的方法。

She*_*ter 5

是的,每次都需要重新设置唤醒器。Future::poll状态(强调我的):

请注意,在多次调用 时poll只有WakerContext传递到最近一次的调用才应安排接收唤醒。

也可以看看:

  • 这里要非常明确的是:答案是明确的:“不,你不能那样做”。如果您很难通过详细的技术解释来理解这一点,那只是因为 Shep 提供了一堆技术原因,很有帮助。它并没有改变这里的基本事实:**没有**方法可以安全地做到这一点,因为它“直接违反”了 API 契约。 (3认同)
  • 您阅读的 API 定义与我不同吗?API 明确指出唤醒器必须来自最近一次调用“poll”时使用的上下文。因为您认为这是一个好主意而保留旧唤醒器,并使用该唤醒器而不是最近上下文中的唤醒器,这是违反 API 契约的行为。如果您可以证明上下文与之前的调用相同,并且它提供相同的唤醒器,那么您可以使用该唤醒器。但没有办法绕过检查这些条件,所以你最好每次都得到唤醒器。 (3认同)
  • @frostyplanet 重申一下,如果你故意破坏 API 契约,那么你测试多少并不重要。任何符合要求的执行者/未来都可以依赖 API 记录的行为,并且您的代码将停止工作。您必须审查组合器和执行器的每个排列的每个可能版本,包括那些尚未发布的版本! (2认同)