是否可以在不锁定整个 HashMap 的情况下在线程之间共享 HashMap?

And*_*rew 19 multithreading rust

我想在线程之间有一个共享的结构。该结构体有许多从未被修改的字段和 a HashMap,即。我不想HashMap为单个更新/删除锁定整个,所以我HashMap看起来像HashMap<u8, Mutex<u8>>. 这有效,但没有意义,因为无论如何线程都会锁定整个地图。

这是这个工作版本,没有线程;我认为这个例子没有必要。

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

fn main() {
    let s = Arc::new(Mutex::new(S::new()));
    let z = s.clone();
    let _ = z.lock().unwrap();
}

struct S {
    x: HashMap<u8, Mutex<u8>>, // other non-mutable fields
}

impl S {
    pub fn new() -> S {
        S {
            x: HashMap::default(),
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

操场

这有可能吗?我在文档中遗漏了什么明显的东西吗?

我一直在努力让这个工作,但我不知道如何。基本上我看到的每个例子都有一个Mutex(或RwLock,或类似的东西)保护内在价值。

She*_*ter 17

我不明白你的请求是怎么可能的,至少在没有一些非常聪明的无锁数据结构的情况下是不可能的;如果多个线程需要插入散列到同一位置的新值,会发生什么?

在以前的工作中,我使用了一个RwLock<HashMap<K, Mutex<V>>>. 将值插入散列时,您会获得一个短期的排他锁。其余时间,您可以有多个线程,将读取器锁定到HashMap,从而锁定给定元素。如果他们需要改变数据,他们可以获得对Mutex.

下面是一个例子:

use std::{
    collections::HashMap,
    sync::{Arc, Mutex, RwLock},
    thread,
    time::Duration,
};

fn main() {
    let data = Arc::new(RwLock::new(HashMap::new()));

    let threads: Vec<_> = (0..10)
        .map(|i| {
            let data = Arc::clone(&data);
            thread::spawn(move || worker_thread(i, data))
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }

    println!("{:?}", data);
}

fn worker_thread(id: u8, data: Arc<RwLock<HashMap<u8, Mutex<i32>>>>) {
    loop {
        // Assume that the element already exists
        let map = data.read().expect("RwLock poisoned");

        if let Some(element) = map.get(&id) {
            let mut element = element.lock().expect("Mutex poisoned");

            // Perform our normal work updating a specific element.
            // The entire HashMap only has a read lock, which
            // means that other threads can access it.
            *element += 1;
            thread::sleep(Duration::from_secs(1));

            return;
        }

        // If we got this far, the element doesn't exist

        // Get rid of our read lock and switch to a write lock
        // You want to minimize the time we hold the writer lock
        drop(map);
        let mut map = data.write().expect("RwLock poisoned");

        // We use HashMap::entry to handle the case where another thread 
        // inserted the same key while where were unlocked.
        thread::sleep(Duration::from_millis(50));
        map.entry(id).or_insert_with(|| Mutex::new(0));
        // Let the loop start us over to try again
    }
}
Run Code Online (Sandbox Code Playgroud)

这在我的机器上运行大约需要 2.7 秒,即使它启动了 10 个线程,每个线程在保持元素数据的排他锁的同时等待 1 秒。

然而,这个解决方案并非没有问题。当一个主锁存在大量争用时,获得写锁可能需要一段时间并完全杀死并行性。

在这种情况下,您可以切换到RwLock<HashMap<K, Arc<Mutex<V>>>>. 一旦获得了读锁或写锁,就可以克隆Arc值的 ,返回它并解锁哈希图。

下一步是使用像arc-swap这样的板条箱,它说:

然后将锁定,克隆 [ RwLock<Arc<T>>] 并解锁。这会受到 CPU 级争用(在锁和 的引用计数上Arc)的影响,这使得它相对较慢。根据实现,更新可能会被稳定流入的读者阻止任意长时间。

ArcSwap可以用来代替,其解决了上述问题,并具有比更好的性能特性RwLock,无论是在争辩和非竞争方案。

我经常提倡执行某种更智能的算法。例如,您可以启动 N 个线程,每个线程都有自己的HashMap. 然后,分片当中的工作。例如,对于上面的简单示例,您可以使用id % N_THREADS。还有一些复杂的分片方案取决于您的数据。

正如 Go 已经很好地宣传了:不要通过共享内存进行通信;相反,通过通信共享内存


Ric*_*tta 9

假设数据的键可以映射到u8

你可以有Arc<HashMap<u8,Mutex<HashMap<Key,Value>>>

当您初始化数据结构时,您会在将其放入 Arc 之前填充所有第一级地图(初始化后它将是不可变的)

当您想要从地图中获取值时,您将需要执行双重获取,例如:

data.get(&map_to_u8(&key)).unwrap().lock().expect("poison").get(&key)

其中unwrap是安全的,因为我们用所有值初始化了第一个映射。

在地图上写下类似的内容:

data.get(&map_to_u8(id)).unwrap().lock().expect("poison").entry(id).or_insert_with(|| value);

很容易看出争用减少了,因为我们现在有 256 个互斥体,并且多个线程请求相同互斥体的概率很低。

@Shepmaster 具有 100 个线程的示例在我的机器上大约需要 10 秒,下面的示例需要 1 秒多一点。

use std::{
    collections::HashMap,
    sync::{Arc, Mutex, RwLock},
    thread,
    time::Duration,
};

fn main() {
    let mut inner = HashMap::new( );
    for i in 0..=u8::max_value() {
        inner.insert(i, Mutex::new(HashMap::new()));
    }
    let data = Arc::new(inner);

    let threads: Vec<_> = (0..100)
        .map(|i| {
            let data = Arc::clone(&data);
            thread::spawn(move || worker_thread(i, data))
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }

    println!("{:?}", data);
}

fn worker_thread(id: u8, data: Arc<HashMap<u8,Mutex<HashMap<u8,Mutex<i32>>>>> ) {
    loop {

        // first unwrap is safe to unwrap because we populated for every `u8`
        if let Some(element) = data.get(&id).unwrap().lock().expect("poison").get(&id) {
            let mut element = element.lock().expect("Mutex poisoned");

            // Perform our normal work updating a specific element.
            // The entire HashMap only has a read lock, which
            // means that other threads can access it.
            *element += 1;
            thread::sleep(Duration::from_secs(1));

            return;
        }

        // If we got this far, the element doesn't exist

        // Get rid of our read lock and switch to a write lock
        // You want to minimize the time we hold the writer lock

        // We use HashMap::entry to handle the case where another thread
        // inserted the same key while where were unlocked.
        thread::sleep(Duration::from_millis(50));
        data.get(&id).unwrap().lock().expect("poison").entry(id).or_insert_with(|| Mutex::new(0));
        // Let the loop start us over to try again
    }
}

Run Code Online (Sandbox Code Playgroud)


jon*_*nas 5

也许您想考虑evmap

一个无锁、最终一致、并发的多值映射。

权衡是最终一致性:在作者刷新地图之前,读者不会看到更改。刷新是原子的,编写者决定何时执行刷新并向读者公开新数据。