我有一个Vec<tokio::sync::broadcast::Receiver<String>>(基本上是通道接收器的向量)。我想订阅所有这些内容并从他们那里获取消息。我该怎么做?
broadcast::Receiver还不是一个流,它只是一个具有recv()函数的对象。要组合其中多个,您必须首先将它们转换为流。
幸运的是,有一个tokio-streams专门用于此目的的板条箱。
一旦接收器转换为流,您就可以使用futures::stream::select_all它们来组合它们:
use futures::stream::select_all;
use futures::StreamExt;
use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::BroadcastStream;
#[tokio::main]
async fn main() {
let (sender1, receiver1) = tokio::sync::broadcast::channel(5);
let (sender2, receiver2) = tokio::sync::broadcast::channel(5);
let receivers = vec![receiver1, receiver2];
// Send on all channels
tokio::spawn(async move {
for i in 0..5 {
sleep(Duration::from_millis(50)).await;
sender1.send(format!("A{i}")).unwrap();
sleep(Duration::from_millis(50)).await;
sender2.send(format!("B{i}")).unwrap();
}
});
// Receive on all channels simultaneously
let mut fused_streams = select_all(receivers.into_iter().map(BroadcastStream::new));
while let Some(value) = fused_streams.next().await {
println!("Got value: {}", value.unwrap());
}
}
Run Code Online (Sandbox Code Playgroud)
Got value: A0
Got value: B0
Got value: A1
Got value: B1
Got value: A2
Got value: B2
Got value: A3
Got value: B3
Got value: A4
Got value: B4
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1432 次 |
| 最近记录: |