为什么在 then 中使用异步块会使我的流取消固定?

Art*_*nko 6 rust

我是 Rust 新手,如果我使用的术语不正确,我很抱歉。也许我对问题的用词选择不正确。

我正在玩流,我需要在流元素之间有一些延迟。所以我写了这个:

use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(0..1000).then(|x| async move {
        time::delay_for(std::time::Duration::from_millis(500)).await;
        x + 1
    });
    while let Some(x) = stream.next().await {
        println!("{:?}", x)
    }
}
Run Code Online (Sandbox Code Playgroud)

我遇到了很多编译错误,但最重要的错误与固定有关。他们来了:

error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>` cannot be unpinned
  --> src/main.rs:11:32
   |
11 |     while let Some(x) = stream.next().await {
   |                                ^^^^ within `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>`
   |
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
   = note: required because it appears within the type `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
   = note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`

error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>` cannot be unpinned
  --> src/main.rs:11:25
   |
11 |     while let Some(x) = stream.next().await {
   |                         ^^^^^^^^^^^^^^^^^^^ within `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>`
   |
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
   = note: required because it appears within the type `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
   = note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
   = note: required because of the requirements on the impl of `core::future::future::Future` for `futures_util::stream::stream::next::Next<'_, futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>>`
Run Code Online (Sandbox Code Playgroud)

如果我将代码更改为:

use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(0..1000).then(|x|  {
        futures::future::ready(x + 1)
    });
    while let Some(x) = stream.next().await {
        println!("{:?}", x)
    }
}
Run Code Online (Sandbox Code Playgroud)

或者对此:

use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
    stream::iter(0..1000)
        .then(|x| async move {
            time::delay_for(std::time::Duration::from_millis(500)).await;
            x + 1
        })
        .for_each(|x| async move { println!("{:?}", x) })
        .await;
}
Run Code Online (Sandbox Code Playgroud)

它编译。

then我认为这与组合器 和的固定和同时使用有关while,但我无法理解它。

rod*_*igo 7

我认为问题归结为async区块不是这样的事实Unpin。可以用这段代码来证明:

fn check_unpin<F: Unpin>(_: F) { }

fn main() {
    check_unpin(async {});
}
Run Code Online (Sandbox Code Playgroud)

失败并显示相当神秘的消息:

error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:4:23: 4:25 _]>` cannot be unpinned
  --> src/main.rs:4:5
   |
1  | fn check_unpin<F: Unpin>(_: F) { }
   |                   ----- required by this bound in `check_unpin`
...
4  |     check_unpin(async {});
   |     ^^^^^^^^^^^ within `impl std::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:4:23: 4:25 _]>`
   |
   = note: required because it appears within the type `impl std::future::Future`
Run Code Online (Sandbox Code Playgroud)

我认为这GenFuture是将async块转换为impl Future.

现在回到您的问题,如果流和闭包返回的值都是(文档中并不完全清楚,但我从源代码中推断出这一点),则组合器then()返回一个值。is ,但是当你写入时,你返回的块不是,因此你的错误。UnpinFutureUnpinstream::iter Unpin|x| async move { x + 1}asyncUnpin

如果你使用futures::future::ready(x + 1)它,它就能工作,因为它Future 实现了 Unpin.

如果您使用StreamExt::for_each它,它就可以工作,因为它Self不需要Unpin。它不是Unpin它本身,但这并不重要,因为您tokio::main在轮询之前将其发送到内部固定所有内容。

如果您希望原始代码正常工作,您只需手动固定流(playground):

use futures::stream;
use futures::StreamExt;
use tokio::time;
use pin_utils::pin_mut;

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..1000).then(|x| async move {
        time::delay_for(std::time::Duration::from_millis(500)).await;
        x + 1
    });
    pin_mut!(stream); //<---- here, pinned!
    while let Some(x) = stream.next().await {
        println!("{:?}", x)
    }
}
Run Code Online (Sandbox Code Playgroud)