fro*_*net 1 future rust async-await rust-tokio
我正在制作自己的通道实现,但std::task::Context没有明确唤醒器是如何生成的。
我的假代码:
struct MyAtomicWaker {
lock: SpinLock,
is_waked: AtomicBool,
waker: std::task::Waker,
}
struct WeakAtomicWaker (Weak<MyAtomicWaker>)
impl MyAtomicWaker {
fn is_waked(&self) -> bool {}
fn weak(self: Arc<MyAtomicWaker>) -> WeakAtomicWaker;
fn cancel(&self) {} // nullify WeakAtomicWaker, means the waker is not waked by a future
}
impl WeakAtomicWaker {
fn wake(self) {} // upgrade to arc and can wake only once when waker not cancelled
}
struct ReceiveFuture<T> {
waker: Option<Arc<MyAtomicWaker>>,
}
impl<T> Drop for ReceiveFuture<T> {
fn drop(&mut self) {
if let Some(waker) = self.waker.take() { waker.cancel(); }
}
}
impl<T> Future for ReceiveFuture<T> {
type Output = Result<(), SendError<T>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let _self = self.get_mut();
if _self.waker.is_none() {
let my_waker = _self.reg_waker(ctx.waker().clone()); // wrap the waker with Arc, store it inside _self, and send the weak ref to other side of channel
_self.waker.replace(my_waker);
}
// do some polling
match _self.recv.try_recv() {
Ok(item)=>{
if let Some(waker) = _self.waker.take() {
waker.cancel();
}
return Poll::Ready(item); //canncel my waker and ready
},
Err(TryRecvError)=>{
if let Some(waker) = _self.waker.as_ref() {
if waker.is_wake() { // the waker is triggered but it's a false alarm, should make a new one.
let my_waker = self.reg_waker(ctx.waker().clone());
_self.waker.replace(my_waker);
} else { // the waker has not trigger, we do not have to make a new one ?
}
}
return Poll::Pending;
},
Err(...)
}
}
}
Run Code Online (Sandbox Code Playgroud)
poll()每次调用都需要注册一个新的唤醒器吗?在我的代码中,由于不同 future 的组合,存在大量超时和循环选择。
我有一个在 Playground 上有效的小实验,但我不确定它在各种设置中是否始终适用于 Tokio 和 async-std 。
在我的生产代码中,我注册一个新唤醒器并在每次调用中取消旧唤醒器poll()。我不知道只在第一次注册唤醒器并在下一次轮询中重用它是否安全。
给出以下顺序:
f.reg_waker(waker1)
f.poll()得到Poll::Pendingf.poll()得到Poll::Pending之后 就waker1.wake()保证能醒来吗?f
我问这个是因为:
我有一个Stream多路复用多个接收通道
我的 MPMC 和 MPSC 通道实现是无锁的。复用选择内的某些通道可能用作关闭通知通道,很少收到消息。当我多次轮询它时(比如一百万次),它将导致一百万个唤醒程序被扔到另一侧(这看起来像内存泄漏)。在没有锁的情况下取消同一未来产生的先前唤醒程序的逻辑比有锁的实现更复杂
由于这些原因,我有一个唤醒取消的解决方案,会导致公平性问题,需要尽可能避免
我对书中的内容或 API 法律的声明不感兴趣;我只对低级别的实现方式感兴趣。显示一些代码为什么它有效或为什么它不起作用会很有帮助。我编写代码来实现产品;如果有必要,我会坚持指定的依赖项或进行一些黑客攻击以完成工作,直到我有更好的方法。
是的,每次都需要重新设置唤醒器。Future::poll状态(强调我的):
请注意,在多次调用 时
poll,只有Waker从Context传递到最近一次的调用才应安排接收唤醒。
也可以看看:
| 归档时间: |
|
| 查看次数: |
2220 次 |
| 最近记录: |