我是 Rust 新手,如果我使用的术语不正确,我很抱歉。也许我对问题的用词选择不正确。
我正在玩流,我需要在流元素之间有一些延迟。所以我写了这个:
use futures::stream;
use futures::StreamExt;
use tokio::time;
#[tokio::main]
async fn main() {
let mut stream = stream::iter(0..1000).then(|x| async move {
time::delay_for(std::time::Duration::from_millis(500)).await;
x + 1
});
while let Some(x) = stream.next().await {
println!("{:?}", x)
}
}
Run Code Online (Sandbox Code Playgroud)
我遇到了很多编译错误,但最重要的错误与固定有关。他们来了:
error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>` cannot be unpinned
--> src/main.rs:11:32
|
11 | while let Some(x) = stream.next().await {
| ^^^^ within `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>`
|
= note: required because it appears within the type `impl core::future::future::Future`
= note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
= note: required because it appears within the type `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
= note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>` cannot be unpinned
--> src/main.rs:11:25
|
11 | while let Some(x) = stream.next().await {
| ^^^^^^^^^^^^^^^^^^^ within `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>`
|
= note: required because it appears within the type `impl core::future::future::Future`
= note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
= note: required because it appears within the type `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
= note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
= note: required because of the requirements on the impl of `core::future::future::Future` for `futures_util::stream::stream::next::Next<'_, futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>>`
Run Code Online (Sandbox Code Playgroud)
如果我将代码更改为:
use futures::stream;
use futures::StreamExt;
use tokio::time;
#[tokio::main]
async fn main() {
let mut stream = stream::iter(0..1000).then(|x| {
futures::future::ready(x + 1)
});
while let Some(x) = stream.next().await {
println!("{:?}", x)
}
}
Run Code Online (Sandbox Code Playgroud)
或者对此:
use futures::stream;
use futures::StreamExt;
use tokio::time;
#[tokio::main]
async fn main() {
stream::iter(0..1000)
.then(|x| async move {
time::delay_for(std::time::Duration::from_millis(500)).await;
x + 1
})
.for_each(|x| async move { println!("{:?}", x) })
.await;
}
Run Code Online (Sandbox Code Playgroud)
它编译。
then我认为这与组合器 和的固定和同时使用有关while,但我无法理解它。
我认为问题归结为async区块不是这样的事实Unpin。可以用这段代码来证明:
fn check_unpin<F: Unpin>(_: F) { }
fn main() {
check_unpin(async {});
}
Run Code Online (Sandbox Code Playgroud)
失败并显示相当神秘的消息:
error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:4:23: 4:25 _]>` cannot be unpinned
--> src/main.rs:4:5
|
1 | fn check_unpin<F: Unpin>(_: F) { }
| ----- required by this bound in `check_unpin`
...
4 | check_unpin(async {});
| ^^^^^^^^^^^ within `impl std::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:4:23: 4:25 _]>`
|
= note: required because it appears within the type `impl std::future::Future`
Run Code Online (Sandbox Code Playgroud)
我认为这GenFuture是将async块转换为impl Future.
现在回到您的问题,如果流和闭包返回的值都是(文档中并不完全清楚,但我从源代码中推断出这一点),则组合器then()返回一个值。is ,但是当你写入时,你返回的块不是,因此你的错误。UnpinFutureUnpinstream::iter Unpin|x| async move { x + 1}asyncUnpin
如果你使用futures::future::ready(x + 1)它,它就能工作,因为它Future 实现了 Unpin.
如果您使用StreamExt::for_each它,它就可以工作,因为它Self不需要Unpin。它不是Unpin它本身,但这并不重要,因为您tokio::main在轮询之前将其发送到内部固定所有内容。
如果您希望原始代码正常工作,您只需手动固定流(playground):
use futures::stream;
use futures::StreamExt;
use tokio::time;
use pin_utils::pin_mut;
#[tokio::main]
async fn main() {
let stream = stream::iter(0..1000).then(|x| async move {
time::delay_for(std::time::Duration::from_millis(500)).await;
x + 1
});
pin_mut!(stream); //<---- here, pinned!
while let Some(x) = stream.next().await {
println!("{:?}", x)
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1331 次 |
| 最近记录: |