csp*_*tta 5 multithreading rust hyper rust-tokio
我有多个线程执行一些繁重的操作,我需要在工作中使用客户端。我使用 Hyper v0.11 作为 HTTP 客户端,我想重用连接,所以我需要共享hyper::Client连接以保持打开连接(在keep-alive模式下)。
客户端不可在线程之间共享(它没有实现Sync或Send)。这是我尝试执行的代码的小片段:
let mut core = Core::new().expect("Create Client Event Loop");
let handle = core.handle();
let remote = core.remote();
let client = Client::new(&handle.clone());
thread::spawn(move || {
// intensive operations...
let response = &client.get("http://google.com".parse().unwrap()).and_then(|res| {
println!("Response: {}", res.status());
Ok(())
});
remote.clone().spawn(|_| {
response.map(|_| { () }).map_err(|_| { () })
});
// more intensive operations...
});
core.run(futures::future::empty::<(), ()>()).unwrap();
Run Code Online (Sandbox Code Playgroud)
此代码无法编译:
let mut core = Core::new().expect("Create Client Event Loop");
let handle = core.handle();
let remote = core.remote();
let client = Client::new(&handle.clone());
thread::spawn(move || {
// intensive operations...
let response = &client.get("http://google.com".parse().unwrap()).and_then(|res| {
println!("Response: {}", res.status());
Ok(())
});
remote.clone().spawn(|_| {
response.map(|_| { () }).map_err(|_| { () })
});
// more intensive operations...
});
core.run(futures::future::empty::<(), ()>()).unwrap();
Run Code Online (Sandbox Code Playgroud)
有没有办法从不同的线程或其他方法重用相同的客户端?
简短的回答是否定的,但这样更好。
每个Client对象都拥有一个连接池。下面是 0.11.0 版本中 Hyper 的Pool定义:
pub struct Pool<T> {
inner: Rc<RefCell<PoolInner<T>>>,
}
Run Code Online (Sandbox Code Playgroud)
正如inner使用 an 进行引用计数Rc并在运行时使用 进行借用检查一样RefCell,池肯定不是线程安全的。当您尝试将其移动Client到新线程时,该对象将持有一个位于另一个线程中的池,这可能是数据竞争的来源。
这种实现是可以理解的。尝试跨多个线程重用 HTTP 连接并不常见,因为它需要同步访问主要是 I/O 密集型的资源。这与 Tokio 的异步特性非常吻合。实际上更合理的做法是在同一个线程中执行多个请求,并让 Tokio 的核心负责异步发送消息和接收消息,而无需按顺序等待每个响应。此外,计算密集型任务可以由 CPU 池执行futures_cpupool。考虑到这一点,下面的代码可以正常工作:
extern crate tokio_core;
extern crate hyper;
extern crate futures;
extern crate futures_cpupool;
use tokio_core::reactor::Core;
use hyper::client::Client;
use futures::Future;
use futures_cpupool::CpuPool;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle.clone());
let pool = CpuPool::new(1);
println!("Begin!");
let req = client.get("http://google.com".parse().unwrap())
.and_then(|res| {
println!("Response: {}", res.status());
Ok(())
});
let intensive = pool.spawn_fn(|| {
println!("I'm working hard!!!");
std::thread::sleep(std::time::Duration::from_secs(1));
println!("Phew!");
Ok(())
});
let task = req.join(intensive)
.map(|_|{
println!("End!");
});
core.run(task).unwrap();
}
Run Code Online (Sandbox Code Playgroud)
如果太晚没有收到响应,输出将是:
pub struct Pool<T> {
inner: Rc<RefCell<PoolInner<T>>>,
}
Run Code Online (Sandbox Code Playgroud)
如果您有多个任务在单独的线程中运行,则问题将变得开放,因为有多种体系结构可行。其中之一是将所有通信委托给单个参与者,从而要求所有其他工作线程将数据发送给它。或者,您可以为每个工作线程分配一个客户端对象,从而也拥有单独的连接池。
| 归档时间: |
|
| 查看次数: |
1935 次 |
| 最近记录: |