如何阅读基于Tokio的Hyper请求的整个主体?

Jay*_*tor 3 rust hyper rust-tokio

我想使用Hyper的当前主分支编写服务器,该分支保存由POST请求传递的消息,并将此消息发送到每个传入的GET请求.

我有这个,大多是从Hyper示例目录中复制的:

extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;

use futures::future::FutureResult;

use hyper::{Get, Post, StatusCode};
use hyper::header::{ContentLength};
use hyper::server::{Http, Service, Request, Response};
use futures::Stream;

struct Echo {
    data: Vec<u8>,
}

impl Echo {
    fn new() -> Self {
        Echo {
            data: "text".into(),
        }
    }
}

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = FutureResult<Response, hyper::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let resp = match (req.method(), req.path()) {
            (&Get, "/") | (&Get, "/echo") => {
                Response::new()
                    .with_header(ContentLength(self.data.len() as u64))
                    .with_body(self.data.clone())
            },
            (&Post, "/") => {
                //self.data.clear(); // argh. &self is not mutable :(
                // even if it was mutable... how to put the entire body into it?
                //req.body().fold(...) ?
                let mut res = Response::new();
                if let Some(len) = req.headers().get::<ContentLength>() {
                    res.headers_mut().set(ContentLength(0));
                }
                res.with_body(req.body())
            },
            _ => {
                Response::new()
                    .with_status(StatusCode::NotFound)
            }
        };
        futures::future::ok(resp)
    }

}


fn main() {
    pretty_env_logger::init().unwrap();
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo::new())).unwrap();
    println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
    server.run().unwrap();
}
Run Code Online (Sandbox Code Playgroud)

如何打开的req.body()(这似乎是一个StreamChunks)成Vec<u8>?我假设我必须以某种方式返回一个Future消耗它Stream并将其变成单个Vec<u8>,也许用fold().但我不知道该怎么做.

euc*_*lio 29

Hyper 0.13为此提供了一个body::to_bytes功能

use hyper::body;
use hyper::{Body, Response};

pub async fn read_response_body(res: Response<Body>) -> Result<String, hyper::Error> {
    let bytes = body::to_bytes(res.into_body()).await?;
    Ok(String::from_utf8(bytes.to_vec()).expect("response was not valid utf-8"))
}
Run Code Online (Sandbox Code Playgroud)

  • 响应[实现了`to_bytes`所需的`HttpBody`](https://docs.rs/hyper/0.13.1/hyper/body/trait.HttpBody.html#impl-Body-2),所以不需要调用`显式地进入_body()`。并且`Bytes`实现了`AsRef&lt;[u8]&gt;`,所以你可以避免`Vec`分配。 (2认同)

She*_*ter 8

我将简化问题,只返回总字节数,而不是回显整个流.

简短的方法

由于期货0.1.14,您可以使用Stream::concat2将所有数据合并为一个:

fn concat2(self) -> Concat2<Self>
where
    Self: Sized,
    Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default, 
Run Code Online (Sandbox Code Playgroud)

超0.12 + Stream::concat2

use futures::{
    future::{self, Either},
    Future, Stream,
}; // 0.1.25

use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.12.20

use tokio; // 0.1.14

fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(|| service::service_fn(echo));

    println!("Listening on http://{}.", server.local_addr());

    let server = server.map_err(|e| eprintln!("Error: {}", e));
    tokio::run(server);
}

fn echo(req: Request<Body>) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
    let (parts, body) = req.into_parts();

    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body.concat2();
            let resp = entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            });
            Either::A(resp)
        }
        _ => {
            let body = Body::from("Can only POST to /");
            let resp = future::ok(Response::new(body));
            Either::B(resp)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

您也可以将其Bytes转换为Vec<u8>通道entire_body.to_vec(),然后将其转换为String.

也可以看看:

漫长的道路

类似于Iterator::fold,Stream::fold取一个累加器(被调用init)和一个在累加器上运行的函数和一个来自流的项.函数的结果必须是与原始函数具有相同错误类型的另一个未来.总结果本身就是未来.

fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
where
    F: FnMut(T, Self::Item) -> Fut,
    Fut: IntoFuture<Item = T>,
    Self::Error: From<Fut::Error>,
    Self: Sized,
Run Code Online (Sandbox Code Playgroud)

我们可以使用a Vec作为累加器.BodyStream实现返回了一个Chunk.这实现了Deref<[u8]>,所以我们可以使用它来将每个块的数据附加到Vec.

超0.11 + Stream::fold

extern crate futures; // 0.1.23
extern crate hyper;   // 0.11.27

use futures::{Future, Stream};
use hyper::{
    server::{Http, Request, Response, Service}, Post,
};

fn main() {
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
    println!(
        "Listening on http://{} with 1 thread.",
        server.local_addr().unwrap()
    );
    server.run().unwrap();
}

struct Echo;

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<futures::Future<Item = Response, Error = Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        match (req.method(), req.path()) {
            (&Post, "/") => {
                let f = req.body()
                    .fold(Vec::new(), |mut acc, chunk| {
                        acc.extend_from_slice(&*chunk);
                        futures::future::ok::<_, Self::Error>(acc)
                    })
                    .map(|body| Response::new().with_body(format!("Read {} bytes", body.len())));

                Box::new(f)
            }
            _ => panic!("Nope"),
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

你也可以将其转换Vec<u8> body为a String.

也可以看看:

产量

从命令行调用时,我们可以看到结果:

$ curl -X POST --data hello http://127.0.0.1:12346/
Read 5 bytes
Run Code Online (Sandbox Code Playgroud)

警告

这两种解决方案都允许恶意最终用户POST一个无限大小的文件,这会导致机器耗尽内存.根据预期用途,您可能希望在读取的字节数上建立某种上限,可能会在某个断点处写入文件系统.

  • @JayStrictor因为给'fold`的闭包需要返回一个未来:`F:FnMut(T,Self :: Item) - > Fut`.这允许操作本身花费时间.由于`extend_from_slice`是同步的,我们使用`future :: ok`"提升"结果.这与`类型Future = FutureResult`非常分开,后者用作处理程序的返回值(我用懒惰的方式装箱). (2认同)