Tom*_*mas 10 multithreading rust rayon
我正在尝试使用Rayon来优化我的功能par_iter().
单线程版本是这样的:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.iter().map(|tx| {
tx.verify_and_store(store)
}).collect();
...
}
Run Code Online (Sandbox Code Playgroud)
每个Store实例只能由一个线程使用,但Store可以同时使用多个实例,所以我可以通过clone-ing 来实现这个多线程store:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.par_iter().map(|tx| {
let mut local_store = store.clone();
tx.verify_and_store(&mut local_store)
}).collect();
...
}
Run Code Online (Sandbox Code Playgroud)
但是,这会store在每次迭代时克隆,这太慢了.我想为每个线程使用一个商店实例.
人造丝可以吗?或者我应该使用手动线程和工作队列?
可以使用线程局部变量来确保local_store在给定线程中不会多次创建.
例如,这编译(完整源代码):
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::cell::RefCell;
thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));
let mut result = Vec::new();
txs.par_iter().map(|tx| {
STORE.with(|cell| {
let mut local_store = cell.borrow_mut();
if local_store.is_none() {
*local_store = Some(store.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}).collect_into(&mut result);
}
Run Code Online (Sandbox Code Playgroud)
但是,此代码存在两个问题.一,如果完成store时需要做某些事情的克隆par_iter(),比如刷新缓冲区,就不会发生 - Drop只有当Rayon的工作线程退出时才会调用它们,即使这样也无法保证.
第二个也是更严重的问题是,store每个工作线程只创建一次克隆.如果Rayon缓存其线程池(我相信它确实如此),这意味着一个不相关的后续调用verify_and_store将继续使用最后已知的克隆store,这可能与当前存储无关.
这可以通过使代码复杂化来纠正:
将克隆的变量存储在一个Mutex<Option<...>>而不是Option,以便它们可以被调用的线程访问par_iter().这将在每次访问时产生互斥锁,但锁将是无争议的,因此便宜.
Arc在互斥锁周围使用,以便在向量中收集对已创建的商店克隆的引用.此向量用于通过None在迭代完成后将它们重置为清理存储.
将整个调用包装在一个不相关的互斥锁中,这样两个并行调用verify_and_store就不会最终看到彼此的存储克隆.(如果在迭代之前创建并安装了新的线程池,这可能是可以避免的.)希望这个序列化不会影响性能verify_and_store,因为每个调用都将使用整个线程池.
结果并不漂亮,但它编译,只使用安全的代码,似乎工作:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::sync::{Arc, Mutex};
type SharedStore = Arc<Mutex<Option<Store>>>;
lazy_static! {
static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
static ref NO_REENTRY: Mutex<()> = Mutex::new(());
}
thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));
let mut result = Vec::new();
let _no_reentry = NO_REENTRY.lock();
txs.par_iter().map({
|tx| {
STORE.with(|arc_mtx| {
let mut local_store = arc_mtx.lock().unwrap();
if local_store.is_none() {
*local_store = Some(store.clone());
STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}
}).collect_into(&mut result);
let mut store_clones = STORE_CLONES.lock().unwrap();
for store in store_clones.drain(..) {
store.lock().unwrap().take();
}
}
Run Code Online (Sandbox Code Playgroud)
老问题,但我觉得答案需要重新审视。一般来说,有两种方法:
使用map_with. 每次线程从另一个线程窃取工作项时,这都会克隆。这可能会克隆比线程更多的存储,但它应该相当低。如果克隆太昂贵,您可以增加大小 rayon 将使用with_min_len.
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.iter().map_with(|| store.clone(), |store, tx| {
tx.verify_and_store(store)
}).collect();
...
}
Run Code Online (Sandbox Code Playgroud)
或者使用范围的ThreadLocal从thread_local箱。这将确保您只使用与线程一样多的对象,并且一旦ThreadLocal对象超出范围,它们就会被销毁。
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let tl = ThreadLocal::new();
let result = txs.iter().map(|tx| {
let store = tl.get_or(|| Box::new(RefCell::new(store.clone)));
tx.verify_and_store(store.get_mut());
}).collect();
...
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
580 次 |
| 最近记录: |