如何对 futures::Stream::concat2 读取的字节数应用限制?

She*_*ter 2 future rust

一个答案 如何阅读基于东京-超请求的整个身体?建议:

您可能希望对读取的字节数设置某种上限 [使用时futures::Stream::concat2]

我怎样才能真正做到这一点?例如,这里有一些代码模拟向我的服务发送无限量数据的恶意用户:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

fn some_bytes() -> impl Stream<Item = Vec<u8>, Error = ()> {
    stream::repeat(b"0123456789ABCDEF".to_vec())
}

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
    some_bytes().concat2()
}

fn main() {
    let v = limited().wait().unwrap();
    println!("{}", v.len());
}
Run Code Online (Sandbox Code Playgroud)

She*_*ter 5

一种解决方案是创建一个流组合器,一旦超过某个字节阈值就结束该流。这是一种可能的实现:

struct TakeBytes<S> {
    inner: S,
    seen: usize,
    limit: usize,
}

impl<S> Stream for TakeBytes<S>
where
    S: Stream<Item = Vec<u8>>,
{
    type Item = Vec<u8>;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.seen >= self.limit {
            return Ok(Async::Ready(None)); // Stream is over
        }

        let inner = self.inner.poll();
        if let Ok(Async::Ready(Some(ref v))) = inner {
            self.seen += v.len();
        }
        inner
    }
}

trait TakeBytesExt: Sized {
    fn take_bytes(self, limit: usize) -> TakeBytes<Self>;
}

impl<S> TakeBytesExt for S
where
    S: Stream<Item = Vec<u8>>,
{
    fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
        TakeBytes {
            inner: self,
            limit,
            seen: 0,
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后可以将其链接到流之前concat2

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
    some_bytes().take_bytes(999).concat2()
}
Run Code Online (Sandbox Code Playgroud)

这个实现有一些注意事项:

  • 它只适用于 Vec<u8>. 当然,您可以引入泛型以使其更广泛地适用。
  • 它允许超过限制的字节数进入,它只是在那点之后停止流。这些类型的决策取决于应用程序。

要记住的另一件事是,您希望尝试尽可能低地解决这个问题——如果数据源已经分配了 1 GB 的内存,那么设置限制将无济于事。