为 Rust 的 hyper http crate 使用自定义传输器

Gue*_*OCs 6 rust hyper

ps:下面的答案有帮助,但这不是我需要的答案,我有一个新问题,我编辑了问题

我正在尝试为hyper http crate制作自定义传输器,因此我可以以自己的方式传输 http 数据包。

Hyper 的 http 客户端可以通过自定义 https://docs.rs/hyper/0.14.2/hyper/client/connect/trait.Connect.html在这里:

pub fn build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send,

如果我们看

impl<S, T> Connect for S where    

S: Service<Uri, Response = T> + Send + 'static,    

S::Error: Into<Box<dyn StdError + Send + Sync>>,    

S::Future: Unpin + Send,    

T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, 
Run Code Online (Sandbox Code Playgroud)

类型T,即 的类型Response,必须实现AsyncRead + AsyncWrite,所以我选择了type Response = Cursor<Vec<u8>>.

这是我的一个自定义转运Responsestd::io::Cursor包裹在CustomResponse这样我就可以实现AsyncWrite,并AsyncRead给它:

use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};

#[derive(Clone)]
pub struct CustomTransporter;

unsafe impl Send for CustomTransporter {}

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Connection for CustomTransporter {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

pub struct CustomResponse {
    //w: Cursor<Vec<u8>>,
    v: Vec<u8>,
    i: i32
}

unsafe impl Send for CustomResponse {
    
}

impl Connection for CustomResponse {
    fn connected(&self) -> Connected {
        println!("connected");
        Connected::new()
    }
}

impl AsyncRead for CustomResponse {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        self.i+=1;
        if self.i >=3 {
            println!("poll_read for buf size {}", buf.capacity());
            buf.put_slice(self.v.as_slice());
            println!("did poll_read");
            Poll::Ready(Ok(()))
        } else {
            println!("poll read pending, i={}", self.i);
            Poll::Pending
        }
    }
}

impl AsyncWrite for CustomResponse {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<Result<usize, std::io::Error>>{
        //let v = vec!();
        println!("poll_write____");

        let s = match std::str::from_utf8(buf) {
            Ok(v) => v,
            Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
        };

        println!("result: {}, size: {}, i: {}", s, s.len(), self.i);
        if self.i>=0{
            //r
            Poll::Ready(Ok(s.len()))
        }else{
            println!("poll_write pending");
            Poll::Pending
        }
    }
    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>> {
        println!("poll_flush");
        if self.i>=0{
            println!("DID poll_flush");
            Poll::Ready(Ok(()))
        }else{
            println!("poll_flush pending");
            Poll::Pending
        }
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>>
    {
        println!("poll_shutdown");
        Poll::Ready(Ok(()))
    }
}


impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        println!("poll_ready");
        Poll::Ready(Ok(()))
        //Poll::Pending
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        println!("call");
        // create the body
        let body: Vec<u8> = "HTTP/1.1 200 OK\nDate: Mon, 27 Jul 2009 12:28:53 GMT\nServer: Apache/2.2.14 (Win32)\nLast-Modified: Wed, 22 Jul 2009 19:15:56 GMT\nContent-Length: 88\nContent-Type: text/html\nConnection: Closed<html><body><h1>Hello, World!</h1></body></html>".as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = CustomResponse{
            //w: Cursor::new(body),
            v: body,
            i: 0
        };
         
        // create a response in a future.
        let fut = async move{
            Ok(resp)
        };
        println!("gonna return from call");
        // Return the response as an immediate future
        Box::pin(fut)
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我像这样使用它:

let connector = CustomTransporter::new();
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
let mut res = client.get(url).await.unwrap();
Run Code Online (Sandbox Code Playgroud)

但是,它卡住了,hyper 永远不会读取我的响应,而是将 GET 写入其中。

这里有一个完整的测试项目: https : //github.com/lzunsec/rust_hyper_custom_transporter/blob/39cd036fc929057d975a71969ccbe97312543061/src/custom_req.rs

像这样运行:

cargo run http://google.com
Run Code Online (Sandbox Code Playgroud)

kmd*_*eko 6

我不能简单地实现Sendto Future,也不能通过Future包装器进行更改。我应该在这里做什么?

看起来问题在于您Service::Future缺少Send约束。返回的未来call已经是Send这样,它可以通过简单的更改工作:

impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
                                                                                  // ^^^^
    ...
Run Code Online (Sandbox Code Playgroud)

您的代码还有其他一些错误: un-inferred vec!()self: Pin<...>missing mutCustomResponseshould be pub...

您可以使用推理来指定Bof client

let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
Run Code Online (Sandbox Code Playgroud)

或者通过在 上使用 turbofish 运算符build

let client = Client::builder().build::<CustomTransporter, hyper::Body>(connector);
Run Code Online (Sandbox Code Playgroud)

我对创建自定义超级传输的了解还不够多,无法了解其功能,但这些修复使其可以编译。希望它能帮助你取得进步。