Jay*_*eis 3 rust rust-tokio rust-async-std
为了理解流是如何工作的,我试图实现一个使用 random.org 的无限数生成器。我做的第一件事是实现一个版本,在该版本中我将调用一个名为 get_number 的异步函数,它将填充缓冲区并返回下一个可能的数字:
struct RandomGenerator {
buffer: Vec<u8>,
position: usize,
}
impl RandomGenerator {
pub fn new() -> RandomGenerator {
Self {
buffer: Vec::new(),
position: 0,
}
}
pub async fn get_number(&mut self) -> u8 {
self.fill_buffer().await;
let value = self.buffer[self.position];
self.position += 1;
value
}
async fn fill_buffer(&mut self) {
if self.buffer.is_empty() || self.is_buffer_depleted() {
let new_numbers = self.fetch_numbers().await;
drop(replace(&mut self.buffer, new_numbers));
self.position = 0;
}
}
fn is_buffer_depleted(&self) -> bool {
self.buffer.len() >= self.position
}
async fn fetch_numbers(&mut self) -> Vec<u8> {
let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
let numbers = response.text().await.unwrap();
numbers
.lines()
.map(|line| line.trim().parse::<u8>().unwrap())
.collect()
}
}
Run Code Online (Sandbox Code Playgroud)
通过这个实现,我可以get_number
在循环中调用该函数并获取我想要的任意数量的数字,但想法是拥有迭代器,这样我就可以调用一堆组合函数,例如take
、take_while
和其他函数。
但是当我尝试实现 Stream 时,问题开始出现:我的第一次尝试是拥有一个包含对生成器的引用的结构
struct RandomGeneratorStream<'a> {
generator: &'a mut RandomGenerator,
}
Run Code Online (Sandbox Code Playgroud)
然后我实现了以下流
impl<'a> Stream for RandomGeneratorStream<'a> {
type Item = u8;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let f = self.get_mut().generator.get_number();
pin_mut!(f);
f.poll_unpin(cx).map(Some)
}
}
Run Code Online (Sandbox Code Playgroud)
但调用这个只会挂起进程
generator.into_stream().take(18).collect::<Vec<u8>>().await
Run Code Online (Sandbox Code Playgroud)
在接下来的尝试中,我尝试使用 pin_mut 在流结构上保留未来的状态!但最终在一生中出现了许多错误而无法解决它们。这种情况下可以做什么呢?这是没有流的工作代码:
use std::mem::replace;
struct RandomGenerator {
buffer: Vec<u8>,
position: usize,
}
impl RandomGenerator {
pub fn new() -> RandomGenerator {
Self {
buffer: Vec::new(),
position: 0,
}
}
pub async fn get_number(&mut self) -> u8 {
self.fill_buffer().await;
let value = self.buffer[self.position];
self.position += 1;
value
}
async fn fill_buffer(&mut self) {
if self.buffer.is_empty() || self.is_buffer_depleted() {
let new_numbers = self.fetch_numbers().await;
drop(replace(&mut self.buffer, new_numbers));
self.position = 0;
}
}
fn is_buffer_depleted(&self) -> bool {
self.buffer.len() >= self.position
}
async fn fetch_numbers(&mut self) -> Vec<u8> {
let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
let numbers = response.text().await.unwrap();
numbers
.lines()
.map(|line| line.trim().parse::<u8>().unwrap())
.collect()
}
}
#[tokio::main]
async fn main() {
let mut generator = RandomGenerator::new();
dbg!(generator.get_number().await);
}
Run Code Online (Sandbox Code Playgroud)
在这里您可以找到第一个工作示例的链接(我没有调用 random.org,而是使用了 Cursor,因为 dns 解析在 Playground 上不起作用)https://play.rust-lang.org/?version=stable&mode =调试&版本=2018&要点=730eaf1f7db842877d3f3e7ca1c6d2a5
我最后一次尝试流你可以在这里找到https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=de0b212ee70865f6ac6c19430cd952cd
在接下来的尝试中,我尝试使用 pin_mut 在流结构上保留未来的状态!但最终在一生中出现了许多错误而无法解决它们。
你走在正确的轨道上,你需要坚持未来才能poll_next
正常工作。
不幸的是,您会遇到可变引用的障碍。您保留 a 是&mut RandomGenerator
为了重复使用它,但 future 本身也必须保留 a&mut RandomGenerator
才能完成其工作。这将违反可变引用的排他性。无论你用什么方式切割它,都可能会遇到这个问题。
从Future
s 到 a的更好方法Stream
是遵循此处的建议并使用futures::stream::unfold
:
fn as_stream<'a>(&'a mut self) -> impl Stream<Item = u8> + 'a {
futures::stream::unfold(self, |rng| async {
let number = rng.get_number().await;
Some((number, rng))
})
}
Run Code Online (Sandbox Code Playgroud)
在操场上看到它。
这可能不一定能帮助您了解有关流的更多信息,但提供的功能通常比手动滚动更好。这避免了上面的多个可变引用问题的关键原因是因为生成 future 的函数获取了可变引用的所有权,然后在完成时将其归还。这样一次只有一个存在。即使您Stream
自己实现,也必须使用类似的机制。