She*_*ter 4 asynchronous future stream rust async-await
我有一个异步Stream,我想从中获取第一个值。我怎样才能这样做呢?
use futures::Stream; // 0.3.5
async fn example<T>(s: impl Stream<Item = T>) -> Option<T> {
todo!("What goes here?")
}
Run Code Online (Sandbox Code Playgroud)
您可以使用StreamExt::next:
use futures::{Stream, StreamExt}; // 0.3.5
async fn example<T>(mut s: impl Stream<Item = T> + Unpin) -> Option<T> {
s.next().await
}
Run Code Online (Sandbox Code Playgroud)
您可以使用StreamExt::into_future:
use futures::{FutureExt, Stream, StreamExt}; // 0.3.5
async fn example<T>(s: impl Stream<Item = T> + Unpin) -> Option<T> {
s.into_future().map(|(v, _)| v).await
}
Run Code Online (Sandbox Code Playgroud)
在极少数情况下,您可能希望使用future::poll_fn来完全控制:
use futures::{future, task::Poll, Stream, StreamExt}; // 0.3.5
async fn example<T>(mut s: impl Stream<Item = T> + Unpin) -> Option<T> {
future::poll_fn(|ctx| {
// Could use methods like `Poll::map` or
// the `?` operator instead of a `match`.
match s.poll_next_unpin(ctx) {
Poll::Ready(v) => {
// Do any special logic here
Poll::Ready(v)
}
Poll::Pending => Poll::Pending,
}
})
.await
}
Run Code Online (Sandbox Code Playgroud)
也可以看看:
如果您想对流中的所有值进行操作,生成单个值,您可以使用StreamExt::fold:
use futures::{Stream, StreamExt}; // 0.3.5
async fn example(s: impl Stream + Unpin) -> usize {
s.fold(0, |st, _| async move { st + 1 }).await
}
Run Code Online (Sandbox Code Playgroud)
如果您想对流中的所有值进行操作而不产生值,您可以使用StreamExt::for_each:
use futures::{Stream, StreamExt}; // 0.3.5
async fn example<I: std::fmt::Debug>(s: impl Stream<Item = I> + Unpin) {
s.for_each(|i| async {
dbg!(i);
})
.await;
}
Run Code Online (Sandbox Code Playgroud)
也可以看看:
Unpin这些例子都需要传入的Stream工具Unpin。Box::pin您还可以通过宏自行固定流pin_mut!。
也可以看看: