一个答案 如何阅读基于东京-超请求的整个身体?建议:
您可能希望对读取的字节数设置某种上限 [使用时
futures::Stream::concat2]
我怎样才能真正做到这一点?例如,这里有一些代码模拟向我的服务发送无限量数据的恶意用户:
extern crate futures; // 0.1.25
use futures::{prelude::*, stream};
fn some_bytes() -> impl Stream<Item = Vec<u8>, Error = ()> {
stream::repeat(b"0123456789ABCDEF".to_vec())
}
fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
some_bytes().concat2()
}
fn main() {
let v = limited().wait().unwrap();
println!("{}", v.len());
}
Run Code Online (Sandbox Code Playgroud)
一种解决方案是创建一个流组合器,一旦超过某个字节阈值就结束该流。这是一种可能的实现:
struct TakeBytes<S> {
inner: S,
seen: usize,
limit: usize,
}
impl<S> Stream for TakeBytes<S>
where
S: Stream<Item = Vec<u8>>,
{
type Item = Vec<u8>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.seen >= self.limit {
return Ok(Async::Ready(None)); // Stream is over
}
let inner = self.inner.poll();
if let Ok(Async::Ready(Some(ref v))) = inner {
self.seen += v.len();
}
inner
}
}
trait TakeBytesExt: Sized {
fn take_bytes(self, limit: usize) -> TakeBytes<Self>;
}
impl<S> TakeBytesExt for S
where
S: Stream<Item = Vec<u8>>,
{
fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
TakeBytes {
inner: self,
limit,
seen: 0,
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后可以将其链接到流之前concat2:
fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
some_bytes().take_bytes(999).concat2()
}
Run Code Online (Sandbox Code Playgroud)
这个实现有一些注意事项:
Vec<u8>. 当然,您可以引入泛型以使其更广泛地适用。要记住的另一件事是,您希望尝试尽可能低地解决这个问题——如果数据源已经分配了 1 GB 的内存,那么设置限制将无济于事。
| 归档时间: |
|
| 查看次数: |
233 次 |
| 最近记录: |