如何将 Stream 转换为 Future?

She*_*ter 4 asynchronous future stream rust async-await

我有一个异步Stream,我想从中获取第一个值。我怎样才能这样做呢?

use futures::Stream; // 0.3.5

async fn example<T>(s: impl Stream<Item = T>) -> Option<T> {
    todo!("What goes here?")
}
Run Code Online (Sandbox Code Playgroud)

She*_*ter 5

您可以使用StreamExt::next

use futures::{Stream, StreamExt}; // 0.3.5

async fn example<T>(mut s: impl Stream<Item = T> + Unpin) -> Option<T> {
    s.next().await
}
Run Code Online (Sandbox Code Playgroud)

您可以使用StreamExt::into_future

use futures::{FutureExt, Stream, StreamExt}; // 0.3.5

async fn example<T>(s: impl Stream<Item = T> + Unpin) -> Option<T> {
    s.into_future().map(|(v, _)| v).await
}
Run Code Online (Sandbox Code Playgroud)

在极少数情况下,您可能希望使用future::poll_fn来完全控制:

use futures::{future, task::Poll, Stream, StreamExt}; // 0.3.5

async fn example<T>(mut s: impl Stream<Item = T> + Unpin) -> Option<T> {
    future::poll_fn(|ctx| {
        // Could use methods like `Poll::map` or
        // the `?` operator instead of a `match`.
        match s.poll_next_unpin(ctx) {
            Poll::Ready(v) => {
                // Do any special logic here
                Poll::Ready(v)
            }
            Poll::Pending => Poll::Pending,
        }
    })
    .await
}
Run Code Online (Sandbox Code Playgroud)

也可以看看:

更广泛地

如果您想对流中的所有值进行操作,生成单个值,您可以使用StreamExt::fold

use futures::{Stream, StreamExt}; // 0.3.5

async fn example(s: impl Stream + Unpin) -> usize {
    s.fold(0, |st, _| async move { st + 1 }).await
}
Run Code Online (Sandbox Code Playgroud)

如果您想对流中的所有值进行操作而不产生值,您可以使用StreamExt::for_each

use futures::{Stream, StreamExt}; // 0.3.5

async fn example<I: std::fmt::Debug>(s: impl Stream<Item = I> + Unpin) {
    s.for_each(|i| async {
        dbg!(i);
    })
    .await;
}
Run Code Online (Sandbox Code Playgroud)

也可以看看:

Unpin

这些例子都需要传入的Stream工具UnpinBox::pin您还可以通过宏自行固定流pin_mut!

也可以看看: