ps:下面的答案有帮助,但这不是我需要的答案,我有一个新问题,我编辑了问题
我正在尝试为hyper http crate制作自定义传输器,因此我可以以自己的方式传输 http 数据包。
Hyper 的 http 客户端可以通过自定义 https://docs.rs/hyper/0.14.2/hyper/client/connect/trait.Connect.html在这里:
pub fn build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send,
如果我们看
impl<S, T> Connect for S where
S: Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
Run Code Online (Sandbox Code Playgroud)
类型T,即 的类型Response,必须实现AsyncRead + AsyncWrite,所以我选择了type Response = Cursor<Vec<u8>>.
这是我的一个自定义转运Response型std::io::Cursor包裹在CustomResponse这样我就可以实现AsyncWrite,并AsyncRead给它:
use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Clone)]
pub struct CustomTransporter;
unsafe impl Send for CustomTransporter {}
impl CustomTransporter {
pub fn new() -> CustomTransporter {
CustomTransporter{}
}
}
impl Connection for CustomTransporter {
fn connected(&self) -> Connected {
Connected::new()
}
}
pub struct CustomResponse {
//w: Cursor<Vec<u8>>,
v: Vec<u8>,
i: i32
}
unsafe impl Send for CustomResponse {
}
impl Connection for CustomResponse {
fn connected(&self) -> Connected {
println!("connected");
Connected::new()
}
}
impl AsyncRead for CustomResponse {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>
) -> Poll<std::io::Result<()>> {
self.i+=1;
if self.i >=3 {
println!("poll_read for buf size {}", buf.capacity());
buf.put_slice(self.v.as_slice());
println!("did poll_read");
Poll::Ready(Ok(()))
} else {
println!("poll read pending, i={}", self.i);
Poll::Pending
}
}
}
impl AsyncWrite for CustomResponse {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<Result<usize, std::io::Error>>{
//let v = vec!();
println!("poll_write____");
let s = match std::str::from_utf8(buf) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
println!("result: {}, size: {}, i: {}", s, s.len(), self.i);
if self.i>=0{
//r
Poll::Ready(Ok(s.len()))
}else{
println!("poll_write pending");
Poll::Pending
}
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), std::io::Error>> {
println!("poll_flush");
if self.i>=0{
println!("DID poll_flush");
Poll::Ready(Ok(()))
}else{
println!("poll_flush pending");
Poll::Pending
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), std::io::Error>>
{
println!("poll_shutdown");
Poll::Ready(Ok(()))
}
}
impl Service<hyper::Uri> for CustomTransporter {
type Response = CustomResponse;
type Error = hyper::http::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
println!("poll_ready");
Poll::Ready(Ok(()))
//Poll::Pending
}
fn call(&mut self, req: hyper::Uri) -> Self::Future {
println!("call");
// create the body
let body: Vec<u8> = "HTTP/1.1 200 OK\nDate: Mon, 27 Jul 2009 12:28:53 GMT\nServer: Apache/2.2.14 (Win32)\nLast-Modified: Wed, 22 Jul 2009 19:15:56 GMT\nContent-Length: 88\nContent-Type: text/html\nConnection: Closed<html><body><h1>Hello, World!</h1></body></html>".as_bytes()
.to_owned();
// Create the HTTP response
let resp = CustomResponse{
//w: Cursor::new(body),
v: body,
i: 0
};
// create a response in a future.
let fut = async move{
Ok(resp)
};
println!("gonna return from call");
// Return the response as an immediate future
Box::pin(fut)
}
}
Run Code Online (Sandbox Code Playgroud)
然后我像这样使用它:
let connector = CustomTransporter::new();
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
let mut res = client.get(url).await.unwrap();
Run Code Online (Sandbox Code Playgroud)
但是,它卡住了,hyper 永远不会读取我的响应,而是将 GET 写入其中。
这里有一个完整的测试项目: https : //github.com/lzunsec/rust_hyper_custom_transporter/blob/39cd036fc929057d975a71969ccbe97312543061/src/custom_req.rs
像这样运行:
cargo run http://google.com
Run Code Online (Sandbox Code Playgroud)
我不能简单地实现
SendtoFuture,也不能通过Future包装器进行更改。我应该在这里做什么?
看起来问题在于您Service::Future缺少Send约束。返回的未来call已经是Send这样,它可以通过简单的更改工作:
impl Service<hyper::Uri> for CustomTransporter {
type Response = CustomResponse;
type Error = hyper::http::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
// ^^^^
...
Run Code Online (Sandbox Code Playgroud)
您的代码还有其他一些错误: un-inferred vec!()、self: Pin<...>missing mut、CustomResponseshould be pub...
您可以使用推理来指定Bof client:
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
Run Code Online (Sandbox Code Playgroud)
或者通过在 上使用 turbofish 运算符build:
let client = Client::builder().build::<CustomTransporter, hyper::Body>(connector);
Run Code Online (Sandbox Code Playgroud)
我对创建自定义超级传输的了解还不够多,无法了解其功能,但这些修复使其可以编译。希望它能帮助你取得进步。