Art*_*nko 5 future rust rust-tokio
我有一个循环,我在其中做一些工作并使用 发送结果Sender。这项工作需要时间,如果失败我需要重试。当我重试时,接收器可能已关闭,我的重试将是浪费时间。因此,我需要一种方法来检查是否Receiver可用而不发送消息。
在理想的世界中,我希望我的代码在伪代码中看起来像这样:
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
// do som stuff with rx and drop it after some time
rx.recv(...).await;
});
let mut attempts = 0;
loop {
if tx.is_closed() {
break;
}
if let Ok(result) = do_work().await {
attempts = 0;
let _ = tx.send(result).await;
} else {
if attempts >= 10 {
break;
} else {
attempts += 1;
continue;
}
}
};
Run Code Online (Sandbox Code Playgroud)
问题是Sender没有is_closed方法。它确实有pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>,但我不知道Context是什么或在哪里可以找到它。
当我没有要发送的值时,如何检查发件人是否能够发送?
Sender有一个try_send方法:
尝试立即向该发件人发送消息
此方法与 send 不同,如果通道缓冲区已满或没有接收器正在等待获取某些数据,则立即返回。与send相比,该函数有两种失败情况,而不是一种(一种是断开连接,一种是缓冲区已满)。
使用它来代替send并检查错误:
if let Err(TrySendError::Closed(_)) = tx.send(result).await {
break;
}
Run Code Online (Sandbox Code Playgroud)
poll_fn通过使用crate可以做你想做的事情futures。它改编了一个返回函数Poll来返回一个Future
use futures::future::poll_fn; // 0.3.5
use std::future::Future;
use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
use tokio::time::delay_for; // 0.2.22
fn wait_until_ready<'a, T>(
sender: &'a mut Sender<T>,
) -> impl Future<Output = Result<(), ClosedError>> + 'a {
poll_fn(move |cx| sender.poll_ready(cx))
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = channel::<i32>(1);
tokio::spawn(async move {
// Receive one value and close the channel;
let val = rx.recv().await;
println!("{:?}", val);
});
wait_until_ready(&mut tx).await.unwrap();
tx.send(123).await.unwrap();
wait_until_ready(&mut tx).await.unwrap();
delay_for(std::time::Duration::from_secs(1)).await;
tx.send(456).await.unwrap(); // 456 likely never printed out,
// despite having a positive readiness response
// and the send "succeeding"
}
Run Code Online (Sandbox Code Playgroud)
但请注意,在一般情况下,这很容易受到TOCTOU的影响。即使Sender'spoll_ready在通道中保留了一个时隙供以后使用,接收端也有可能在就绪检查和实际发送之间关闭。我试图在代码中指出这一点。