如何查找 tokio::sync::mpsc::Receiver 是否已关闭?

Art*_*nko 5 future rust rust-tokio

我有一个循环,我在其中做一些工作并使用 发送结果Sender。这项工作需要时间,如果失败我需要重试。当我重试时,接收器可能已关闭,我的重试将是浪费时间。因此,我需要一种方法来检查是否Receiver可用而不发送消息。

在理想的世界中,我希望我的代码在伪代码中看起来像这样:

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
   // do som stuff with rx and drop it after some time
    rx.recv(...).await;
});

let mut attempts = 0;
loop {
    if tx.is_closed() {
       break;
    }
    if let Ok(result) = do_work().await {
        attempts = 0;
        let _ = tx.send(result).await;
    } else {
        if attempts >= 10 {
            break;
        } else {
            attempts += 1;
            continue;
        }
    }
};
Run Code Online (Sandbox Code Playgroud)

问题是Sender没有is_closed方法。它确实有pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>,但我不知道Context是什么或在哪里可以找到它。

当我没有要发送的值时,如何检查发件人是否能够发送?

jus*_*nas 3

Sender有一个try_send方法:

尝试立即向该发件人发送消息

此方法与 send 不同,如果通道缓冲区已满或没有接收器正在等待获取某些数据,则立即返回。与send相比,该函数有两种失败情况,而不是一种(一种是断开连接,一种是缓冲区已满)。

使用它来代替send并检查错误:

if let Err(TrySendError::Closed(_)) = tx.send(result).await {
    break;
}
Run Code Online (Sandbox Code Playgroud)

poll_fn通过使用crate可以做你想做的事情futures。它改编了一个返回函数Poll来返回一个Future

use futures::future::poll_fn; // 0.3.5
use std::future::Future;
use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
use tokio::time::delay_for; // 0.2.22

fn wait_until_ready<'a, T>(
    sender: &'a mut Sender<T>,
) -> impl Future<Output = Result<(), ClosedError>> + 'a {
    poll_fn(move |cx| sender.poll_ready(cx))
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = channel::<i32>(1);

    tokio::spawn(async move {
        // Receive one value and close the channel;
        let val = rx.recv().await;
        println!("{:?}", val);
    });

    wait_until_ready(&mut tx).await.unwrap();
    tx.send(123).await.unwrap();

    wait_until_ready(&mut tx).await.unwrap();
    delay_for(std::time::Duration::from_secs(1)).await;
    tx.send(456).await.unwrap(); // 456 likely never printed out,
                                 // despite having a positive readiness response
                                 // and the send "succeeding"
}
Run Code Online (Sandbox Code Playgroud)

但请注意,在一般情况下,这很容易受到TOCTOU的影响。即使Sender'spoll_ready在通道中保留了一个时隙供以后使用,接收端也有可能在就绪检查和实际发送之间关闭。我试图在代码中指出这一点。