rust 一次从多个渠道接收消息

moh*_*vad 5 rust rust-tokio

我有一个Vec<tokio::sync::broadcast::Receiver<String>>(基本上是通道接收器的向量)。我想订阅所有这些内容并从他们那里获取消息。我该怎么做?

Fin*_*nis 7

broadcast::Receiver还不是一个流,它只是一个具有recv()函数的对象。要组合其中多个,您必须首先将它们转换为流。

幸运的是,有一个tokio-streams专门用于此目的的板条箱。

一旦接收器转换为流,您就可以使用futures::stream::select_all它们来组合它们:

use futures::stream::select_all;
use futures::StreamExt;
use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::BroadcastStream;

#[tokio::main]
async fn main() {
    let (sender1, receiver1) = tokio::sync::broadcast::channel(5);
    let (sender2, receiver2) = tokio::sync::broadcast::channel(5);

    let receivers = vec![receiver1, receiver2];

    // Send on all channels
    tokio::spawn(async move {
        for i in 0..5 {
            sleep(Duration::from_millis(50)).await;
            sender1.send(format!("A{i}")).unwrap();
            sleep(Duration::from_millis(50)).await;
            sender2.send(format!("B{i}")).unwrap();
        }
    });

    // Receive on all channels simultaneously
    let mut fused_streams = select_all(receivers.into_iter().map(BroadcastStream::new));
    while let Some(value) = fused_streams.next().await {
        println!("Got value: {}", value.unwrap());
    }
}
Run Code Online (Sandbox Code Playgroud)
Got value: A0
Got value: B0
Got value: A1
Got value: B1
Got value: A2
Got value: B2
Got value: A3
Got value: B3
Got value: A4
Got value: B4
Run Code Online (Sandbox Code Playgroud)