use*_*375 3 rust hyper rust-tokio
我正在尝试调整Hyper basic 客户端示例以同时获取多个 URL。
这是我目前拥有的代码:
extern crate futures;
extern crate hyper;
extern crate tokio_core;
use std::io::{self, Write};
use std::iter;
use futures::{Future, Stream};
use hyper::Client;
use tokio_core::reactor::Core;
fn get_url() {
let mut core = Core::new().unwrap();
let client = Client::new(&core.handle());
let uris: Vec<_> = iter::repeat("http://httpbin.org/ip".parse().unwrap()).take(50).collect();
for uri in uris {
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().for_each(|chunk| {
io::stdout()
.write_all(&chunk)
.map_err(From::from)
})
});
core.run(work).unwrap();
}
}
fn main() {
get_url();
}
Run Code Online (Sandbox Code Playgroud)
它似乎没有同时进行(需要很长时间才能完成),我是否以错误的方式将工作交给了核心?
我是否以错误的方式将工作交给核心?
是的,您正在向 Tokio 发出一个请求,并要求该请求在开始下一个请求之前完成。您已经采用了异步代码并强制它是顺序的。
您需要给反应堆一个单一的未来,它将执行不同类型的并发工作。
use futures::prelude::*;
use hyper::{body, client::Client};
use std::{
io::{self, Write},
iter,
};
use tokio;
const N_CONCURRENT: usize = 1;
#[tokio::main]
async fn main() {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse().unwrap();
let uris = iter::repeat(uri).take(50);
stream::iter(uris)
.map(move |uri| client.get(uri))
.buffer_unordered(N_CONCURRENT)
.then(|res| async {
let res = res.expect("Error making request: {}");
println!("Response: {}", res.status());
body::to_bytes(res).await.expect("Error reading body")
})
.for_each(|body| async move {
io::stdout().write_all(&body).expect("Error writing body");
})
.await;
}
Run Code Online (Sandbox Code Playgroud)
设置为 1时N_CONCURRENT:
real 1.119 1119085us
user 0.012 12021us
sys 0.011 11459us
Run Code Online (Sandbox Code Playgroud)
并设置为 10:
real 0.216 216285us
user 0.014 13596us
sys 0.021 20640us
Run Code Online (Sandbox Code Playgroud)
Cargo.toml
[dependencies]
futures = "0.3.17"
hyper = { version = "0.14.13", features = ["client", "http1", "tcp"] }
tokio = { version = "1.12.0", features = ["full"] }
Run Code Online (Sandbox Code Playgroud)
use futures::{stream, Future, Stream}; // 0.1.25
use hyper::Client; // 0.12.23
use std::{
io::{self, Write},
iter,
};
use tokio; // 0.1.15
const N_CONCURRENT: usize = 1;
fn main() {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse().unwrap();
let uris = iter::repeat(uri).take(50);
let work = stream::iter_ok(uris)
.map(move |uri| client.get(uri))
.buffer_unordered(N_CONCURRENT)
.and_then(|res| {
println!("Response: {}", res.status());
res.into_body()
.concat2()
.map_err(|e| panic!("Error collecting body: {}", e))
})
.for_each(|body| {
io::stdout()
.write_all(&body)
.map_err(|e| panic!("Error writing: {}", e))
})
.map_err(|e| panic!("Error making request: {}", e));
tokio::run(work);
}
Run Code Online (Sandbox Code Playgroud)
设置为 1时N_CONCURRENT:
[dependencies]
futures = "0.3.17"
hyper = { version = "0.14.13", features = ["client", "http1", "tcp"] }
tokio = { version = "1.12.0", features = ["full"] }
Run Code Online (Sandbox Code Playgroud)
并设置为 10:
use futures::{stream, Future, Stream}; // 0.1.25
use hyper::Client; // 0.12.23
use std::{
io::{self, Write},
iter,
};
use tokio; // 0.1.15
const N_CONCURRENT: usize = 1;
fn main() {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse().unwrap();
let uris = iter::repeat(uri).take(50);
let work = stream::iter_ok(uris)
.map(move |uri| client.get(uri))
.buffer_unordered(N_CONCURRENT)
.and_then(|res| {
println!("Response: {}", res.status());
res.into_body()
.concat2()
.map_err(|e| panic!("Error collecting body: {}", e))
})
.for_each(|body| {
io::stdout()
.write_all(&body)
.map_err(|e| panic!("Error writing: {}", e))
})
.map_err(|e| panic!("Error making request: {}", e));
tokio::run(work);
}
Run Code Online (Sandbox Code Playgroud)
也可以看看: