And*_*rew 19 multithreading rust
我想在线程之间有一个共享的结构。该结构体有许多从未被修改的字段和 a HashMap,即。我不想HashMap为单个更新/删除锁定整个,所以我HashMap看起来像HashMap<u8, Mutex<u8>>. 这有效,但没有意义,因为无论如何线程都会锁定整个地图。
这是这个工作版本,没有线程;我认为这个例子没有必要。
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
fn main() {
let s = Arc::new(Mutex::new(S::new()));
let z = s.clone();
let _ = z.lock().unwrap();
}
struct S {
x: HashMap<u8, Mutex<u8>>, // other non-mutable fields
}
impl S {
pub fn new() -> S {
S {
x: HashMap::default(),
}
}
}
Run Code Online (Sandbox Code Playgroud)
这有可能吗?我在文档中遗漏了什么明显的东西吗?
我一直在努力让这个工作,但我不知道如何。基本上我看到的每个例子都有一个Mutex(或RwLock,或类似的东西)保护内在价值。
She*_*ter 17
我不明白你的请求是怎么可能的,至少在没有一些非常聪明的无锁数据结构的情况下是不可能的;如果多个线程需要插入散列到同一位置的新值,会发生什么?
在以前的工作中,我使用了一个RwLock<HashMap<K, Mutex<V>>>. 将值插入散列时,您会获得一个短期的排他锁。其余时间,您可以有多个线程,将读取器锁定到HashMap,从而锁定给定元素。如果他们需要改变数据,他们可以获得对Mutex.
下面是一个例子:
use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
thread,
time::Duration,
};
fn main() {
let data = Arc::new(RwLock::new(HashMap::new()));
let threads: Vec<_> = (0..10)
.map(|i| {
let data = Arc::clone(&data);
thread::spawn(move || worker_thread(i, data))
})
.collect();
for t in threads {
t.join().expect("Thread panicked");
}
println!("{:?}", data);
}
fn worker_thread(id: u8, data: Arc<RwLock<HashMap<u8, Mutex<i32>>>>) {
loop {
// Assume that the element already exists
let map = data.read().expect("RwLock poisoned");
if let Some(element) = map.get(&id) {
let mut element = element.lock().expect("Mutex poisoned");
// Perform our normal work updating a specific element.
// The entire HashMap only has a read lock, which
// means that other threads can access it.
*element += 1;
thread::sleep(Duration::from_secs(1));
return;
}
// If we got this far, the element doesn't exist
// Get rid of our read lock and switch to a write lock
// You want to minimize the time we hold the writer lock
drop(map);
let mut map = data.write().expect("RwLock poisoned");
// We use HashMap::entry to handle the case where another thread
// inserted the same key while where were unlocked.
thread::sleep(Duration::from_millis(50));
map.entry(id).or_insert_with(|| Mutex::new(0));
// Let the loop start us over to try again
}
}
Run Code Online (Sandbox Code Playgroud)
这在我的机器上运行大约需要 2.7 秒,即使它启动了 10 个线程,每个线程在保持元素数据的排他锁的同时等待 1 秒。
然而,这个解决方案并非没有问题。当一个主锁存在大量争用时,获得写锁可能需要一段时间并完全杀死并行性。
在这种情况下,您可以切换到RwLock<HashMap<K, Arc<Mutex<V>>>>. 一旦获得了读锁或写锁,就可以克隆Arc值的 ,返回它并解锁哈希图。
下一步是使用像arc-swap这样的板条箱,它说:
然后将锁定,克隆 [
RwLock<Arc<T>>] 并解锁。这会受到 CPU 级争用(在锁和 的引用计数上Arc)的影响,这使得它相对较慢。根据实现,更新可能会被稳定流入的读者阻止任意长时间。的
ArcSwap可以用来代替,其解决了上述问题,并具有比更好的性能特性RwLock,无论是在争辩和非竞争方案。
我经常提倡执行某种更智能的算法。例如,您可以启动 N 个线程,每个线程都有自己的HashMap. 然后,分片当中的工作。例如,对于上面的简单示例,您可以使用id % N_THREADS。还有一些复杂的分片方案取决于您的数据。
正如 Go 已经很好地宣传了:不要通过共享内存进行通信;相反,通过通信共享内存。
假设数据的键可以映射到u8
你可以有Arc<HashMap<u8,Mutex<HashMap<Key,Value>>>
当您初始化数据结构时,您会在将其放入 Arc 之前填充所有第一级地图(初始化后它将是不可变的)
当您想要从地图中获取值时,您将需要执行双重获取,例如:
data.get(&map_to_u8(&key)).unwrap().lock().expect("poison").get(&key)
其中unwrap是安全的,因为我们用所有值初始化了第一个映射。
在地图上写下类似的内容:
data.get(&map_to_u8(id)).unwrap().lock().expect("poison").entry(id).or_insert_with(|| value);
很容易看出争用减少了,因为我们现在有 256 个互斥体,并且多个线程请求相同互斥体的概率很低。
@Shepmaster 具有 100 个线程的示例在我的机器上大约需要 10 秒,下面的示例需要 1 秒多一点。
use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
thread,
time::Duration,
};
fn main() {
let mut inner = HashMap::new( );
for i in 0..=u8::max_value() {
inner.insert(i, Mutex::new(HashMap::new()));
}
let data = Arc::new(inner);
let threads: Vec<_> = (0..100)
.map(|i| {
let data = Arc::clone(&data);
thread::spawn(move || worker_thread(i, data))
})
.collect();
for t in threads {
t.join().expect("Thread panicked");
}
println!("{:?}", data);
}
fn worker_thread(id: u8, data: Arc<HashMap<u8,Mutex<HashMap<u8,Mutex<i32>>>>> ) {
loop {
// first unwrap is safe to unwrap because we populated for every `u8`
if let Some(element) = data.get(&id).unwrap().lock().expect("poison").get(&id) {
let mut element = element.lock().expect("Mutex poisoned");
// Perform our normal work updating a specific element.
// The entire HashMap only has a read lock, which
// means that other threads can access it.
*element += 1;
thread::sleep(Duration::from_secs(1));
return;
}
// If we got this far, the element doesn't exist
// Get rid of our read lock and switch to a write lock
// You want to minimize the time we hold the writer lock
// We use HashMap::entry to handle the case where another thread
// inserted the same key while where were unlocked.
thread::sleep(Duration::from_millis(50));
data.get(&id).unwrap().lock().expect("poison").entry(id).or_insert_with(|| Mutex::new(0));
// Let the loop start us over to try again
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10209 次 |
| 最近记录: |